ibzib commented on a change in pull request #14409:
URL: https://github.com/apache/beam/pull/14409#discussion_r606329603
##########
File path:
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
##########
@@ -246,17 +258,28 @@ public SparkPipelineResult run(final Pipeline pipeline) {
result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
}
- if (mOptions.getEnableSparkMetricSinks()) {
- registerMetricsSource(mOptions.getAppName());
+ if (pipelineOptions.getEnableSparkMetricSinks()) {
+ registerMetricsSource(pipelineOptions.getAppName());
}
// it would have been better to create MetricsPusher from runner-core but
we need
// runner-specific
// MetricsContainerStepMap
MetricsPusher metricsPusher =
new MetricsPusher(
- MetricsAccumulator.getInstance().value(),
mOptions.as(MetricsOptions.class), result);
+ MetricsAccumulator.getInstance().value(),
+ pipelineOptions.as(MetricsOptions.class),
+ result);
metricsPusher.start();
+
+ if (eventLoggingListener != null && jsc != null) {
Review comment:
I'm curious how `jsc` could be null here?
##########
File path:
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
##########
@@ -97,7 +102,7 @@
private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
/** Options used in this pipeline runner. */
- private final SparkPipelineOptions mOptions;
+ private final SparkPipelineOptions pipelineOptions;
Review comment:
+1 😄
##########
File path:
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
##########
@@ -34,18 +34,6 @@
*/
public interface SparkPipelineOptions extends SparkCommonPipelineOptions {
- @Description("Set it to true if event logs should be saved to Spark History
Server directory")
Review comment:
Makes sense. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]