ibzib commented on a change in pull request #13743:
URL: https://github.com/apache/beam/pull/13743#discussion_r564186788



##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
##########
@@ -34,6 +34,18 @@
  */
 public interface SparkPipelineOptions extends SparkCommonPipelineOptions {
 
+  @Description("Set it to true if event logs should be saved to Spark History 
Server directory")
+  @Default.String("false")
+  String getEventLogEnabled();

Review comment:
       This should be a boolean, not a String.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +137,84 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    long startTime = Instant.now().getMillis();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());
+      File eventLogDirectoryFile = new File(eventLogDirectory.getPath());
+      if (eventLogDirectoryFile.exists()
+          && eventLogDirectoryFile.isDirectory()
+          && pipelineOptions.getEventLogEnabled().equals("true")) {
+        eventLoggingListener =
+            new EventLoggingListener(
+                jobId,
+                new scala.Option<String>() {
+                  @Override
+                  public boolean isEmpty() {
+                    return false;
+                  }
+
+                  @Override
+                  public String get() {
+                    return jobName;
+                  }
+
+                  @Override
+                  public Object productElement(int i) {
+                    return null;
+                  }
+
+                  @Override
+                  public int productArity() {
+                    return 0;
+                  }
+
+                  @Override
+                  public boolean canEqual(Object o) {
+                    return false;
+                  }
+                },
+                eventLogDirectory,
+                jsc.getConf(),
+                jsc.hadoopConfiguration());
+      } else {
+        eventLoggingListener = null;

Review comment:
       ```suggestion
   ```

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +140,79 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    Long startTime = jsc.startTime();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());

Review comment:
       I missed that. Sounds good.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +140,79 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    Long startTime = jsc.startTime();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());
+      File eventLogDirectoryFile = new File(eventLogDirectory.getPath());
+      if (eventLogDirectoryFile.exists() && 
eventLogDirectoryFile.isDirectory()) {
+        eventLoggingListener =
+            new EventLoggingListener(
+                jobId,
+                new scala.Option<String>() {
+                  @Override
+                  public boolean isEmpty() {
+                    return false;
+                  }
+
+                  @Override
+                  public String get() {
+                    return jobName;
+                  }
+
+                  @Override
+                  public Object productElement(int i) {
+                    return null;
+                  }
+
+                  @Override
+                  public int productArity() {
+                    return 0;
+                  }
+
+                  @Override
+                  public boolean canEqual(Object o) {
+                    return false;
+                  }
+                },
+                eventLogDirectory,
+                jsc.getConf(),
+                jsc.hadoopConfiguration());
+      } else {
+        eventLoggingListener = null;
+      }
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+      eventLoggingListener = null;
+    }
+    if (eventLoggingListener != null) {

Review comment:
       You should be able to remove the `if (eventLoggingListener != null)` 
check altogether; instead it should be impossible for `eventLoggingListener` to 
ever be null. (Ideally we'd have the null checker verify that, but it is 
ignored on this class for some other reason.)
   
   One solution is to create an EventLoggingListener regardless of the value of 
`getEventLogEnabled`, and just don't start it or do anything else with it if 
`getEventLogEnabled` is false.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +137,84 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    long startTime = Instant.now().getMillis();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());
+      File eventLogDirectoryFile = new File(eventLogDirectory.getPath());
+      if (eventLogDirectoryFile.exists()
+          && eventLogDirectoryFile.isDirectory()
+          && pipelineOptions.getEventLogEnabled().equals("true")) {

Review comment:
       This logic is more complicated than it needs to be. `if 
(pipelineOptions.getEventLogEnabled())` should be checked before doing anything 
related to event logging.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +140,79 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    Long startTime = jsc.startTime();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();

Review comment:
       The only place you use `sparkExecutorID` is to log the `onExecutorAdded` 
event. So I imagine there should be one `onExecutorAdded` event per distinct 
`sparkExecutorID`.

##########
File path: runners/spark/job-server/build.gradle
##########
@@ -73,6 +73,8 @@ runShadow {
     args += 
["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
   if (project.hasProperty('sparkMasterUrl'))
     args += ["--spark-master-url=${project.property('sparkMasterUrl')}"]
+  if (project.hasProperty('sparkHistoryDir'))

Review comment:
       It's fine to add the pipeline options for the Python and Go SDKs in 
separate PRs.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
##########
@@ -38,9 +38,35 @@
 /**
  * An adapter between the {@link MetricsContainerStepMap} and Codahale's 
{@link Metric} interface.
  */
-class SparkBeamMetric implements Metric {
+public class SparkBeamMetric implements Metric {
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9-]";
 
+  public static Map<String, String> renderAll(MetricResults metricResults) {

Review comment:
       What's the difference between this and the existing `renderAll` method?

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +140,79 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    Long startTime = jsc.startTime();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());
+      File eventLogDirectoryFile = new File(eventLogDirectory.getPath());
+      if (eventLogDirectoryFile.exists() && 
eventLogDirectoryFile.isDirectory()) {
+        eventLoggingListener =
+            new EventLoggingListener(
+                jobId,
+                new scala.Option<String>() {

Review comment:
       You shouldn't need to inherit from `Option` or `Some`, can't you just 
instantiate it directly? 
https://stackoverflow.com/questions/5287451/how-to-call-scalas-option-constructors-from-java

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -123,10 +137,84 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
         "Will stage {} files. (Enable logging at DEBUG level to see which 
files will be staged.)",
         pipelineOptions.getFilesToStage().size());
     LOG.debug("Staging files: {}", pipelineOptions.getFilesToStage());
-
     PortablePipelineResult result;
     final JavaSparkContext jsc = 
SparkContextFactory.getSparkContext(pipelineOptions);
 
+    EventLoggingListener eventLoggingListener;
+    String jobId = jobInfo.jobId();
+    String jobName = jobInfo.jobName();
+    long startTime = Instant.now().getMillis();
+    String sparkUser = jsc.sparkUser();
+    String sparkMaster = "";
+    String sparkExecutorID = "";
+    Tuple2<String, String>[] sparkConfList = jsc.getConf().getAll();
+    for (Tuple2<String, String> sparkConf : sparkConfList) {
+      if (sparkConf._1().equals("spark.master")) {
+        sparkMaster = sparkConf._2();
+      } else if (sparkConf._1().equals("spark.executor.id")) {
+        sparkExecutorID = sparkConf._2();
+      }
+    }
+    try {
+      URI eventLogDirectory = new URI(pipelineOptions.getSparkHistoryDir());
+      File eventLogDirectoryFile = new File(eventLogDirectory.getPath());
+      if (eventLogDirectoryFile.exists()
+          && eventLogDirectoryFile.isDirectory()
+          && pipelineOptions.getEventLogEnabled().equals("true")) {
+        eventLoggingListener =
+            new EventLoggingListener(
+                jobId,
+                new scala.Option<String>() {
+                  @Override
+                  public boolean isEmpty() {
+                    return false;
+                  }
+
+                  @Override
+                  public String get() {
+                    return jobName;
+                  }
+
+                  @Override
+                  public Object productElement(int i) {
+                    return null;
+                  }
+
+                  @Override
+                  public int productArity() {
+                    return 0;
+                  }
+
+                  @Override
+                  public boolean canEqual(Object o) {
+                    return false;
+                  }
+                },
+                eventLogDirectory,
+                jsc.getConf(),
+                jsc.hadoopConfiguration());
+      } else {
+        eventLoggingListener = null;
+        if (pipelineOptions.getEventLogEnabled().equals("true")) {
+          throw new RuntimeException("Failed to initialize Spark History Log 
Directory");
+        }
+      }
+    } catch (URISyntaxException e) {
+      e.printStackTrace();

Review comment:
       This exception should not be ignored; remove this `catch` block.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -213,6 +301,97 @@ public PortablePipelineResult run(RunnerApi.Pipeline 
pipeline, JobInfo jobInfo)
             result);
     metricsPusher.start();
 
+    if (eventLoggingListener != null) {
+      eventLoggingListener.onApplicationStart(
+          new SparkListenerApplicationStart(
+              jobId,
+              new scala.Option<String>() {
+                @Override
+                public boolean isEmpty() {
+                  return false;
+                }
+
+                @Override
+                public String get() {
+                  return jobName;
+                }
+
+                @Override
+                public Object productElement(int i) {
+                  return null;
+                }
+
+                @Override
+                public int productArity() {
+                  return 0;
+                }
+
+                @Override
+                public boolean canEqual(Object o) {
+                  return false;
+                }
+              },
+              startTime,
+              sparkUser,
+              new scala.Option<String>() {
+                @Override
+                public boolean isEmpty() {
+                  return false;
+                }
+
+                @Override
+                public String get() {
+                  return jobName;
+                }
+
+                @Override
+                public Object productElement(int i) {
+                  return null;
+                }
+
+                @Override
+                public int productArity() {
+                  return 0;
+                }
+
+                @Override
+                public boolean canEqual(Object o) {
+                  return false;
+                }
+              },
+              new scala.Option<Map<String, String>>() {
+                @Override
+                public boolean isEmpty() {
+                  return false;
+                }
+
+                @Override
+                public Map<String, String> get() {
+                  return JavaConverters.mapAsScalaMapConverter(
+                          SparkBeamMetric.renderAll(result.metrics()))
+                      .asScala();
+                }
+
+                @Override
+                public Object productElement(int i) {
+                  return null;
+                }
+
+                @Override
+                public int productArity() {
+                  return 0;
+                }
+
+                @Override
+                public boolean canEqual(Object o) {
+                  return false;
+                }
+              }));
+      eventLoggingListener.onApplicationEnd(
+          new SparkListenerApplicationEnd(Instant.now().getMillis()));

Review comment:
       I'm pretty sure pipeline end time (and also start time for that matter) 
is itself a metric. To keep things consistent, it'd be better to use that 
metric here instead of `Instant.now()`.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
##########
@@ -268,12 +447,30 @@ public static void main(String[] args) throws Exception {
   }
 
   private static class SparkPipelineRunnerConfiguration {
+    @Option(
+        name = "--event-log-enabled",
+        usage = "Set it to true if event logs should be saved to Spark History 
Server directory")
+    private String eventLogEnabled = "false";
+
+    String getEventLogEnabled() {
+      return this.eventLogEnabled;
+    }
+
     @Option(
         name = "--base-job-name",
         usage =
             "The job to run. This must correspond to a subdirectory of the 
jar's BEAM-PIPELINE "
                 + "directory. *Only needs to be specified if the jar contains 
multiple pipelines.*")
     private String baseJobName = null;
+
+    @Option(
+        name = "--spark-history-dir",
+        usage = "Spark history dir to store logs (e.g. /tmp/spark-events/)")
+    private String sparkHistoryDir = "/tmp/spark-events/";

Review comment:
       Please remove the `--spark-history-dir` option from 
`SparkPipelineRunnerConfiguration` since it is unused and unneeded.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to