[ 
https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=150328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150328
 ]

ASF GitHub Bot logged work on BEAM-3371:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/18 09:56
            Start Date: 02/Oct/18 09:56
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev closed pull request #6244: [BEAM-3371] 
Enable running integration tests on Spark
URL: https://github.com/apache/beam/pull/6244
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5b4154d957d..454eb025c05 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1109,6 +1109,17 @@ artifactId=${project.name}
           testCompile it.project(path: ":beam-runners-flink_2.11", 
configuration: 'shadowTest')
         }
 
+        if (runner?.equalsIgnoreCase('spark')) {
+          testCompile it.project(path: ":beam-runners-spark", configuration: 
'shadowTest')
+          testCompile project.library.java.spark_core
+          testCompile project.library.java.spark_streaming
+
+          // Testing the Spark runner causes a StackOverflowError if 
slf4j-jdk14 is on the classpath
+          project.configurations.testRuntimeClasspath {
+            exclude group: "org.slf4j", module: "slf4j-jdk14"
+          }
+        }
+
         /* include dependencies required by filesystems */
         if (filesystem?.equalsIgnoreCase('hdfs')) {
           testCompile it.project(path: 
":beam-sdks-java-io-hadoop-file-system", configuration: 'shadowTest')
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
index f718b70bcf3..c982e62a330 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -17,12 +17,24 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.core.Base64Variants;
+import com.google.common.base.Strings;
+import com.google.common.hash.Funnels;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.ZipFiles;
 
 /** Utilities for working with classpath resources for pipelines. */
 public class PipelineResources {
@@ -57,4 +69,60 @@
     }
     return files;
   }
+
+  /**
+   * Goes through the list of files that need to be staged on runner. Removes 
nonexistent
+   * directories and packages existing ones. This is necessary for runners 
that require filesToStage
+   * to be jars only.
+   *
+   * @param resourcesToStage list of resources that need to be staged
+   * @param tmpJarLocation temporary directory to store the jars
+   * @return A list of absolute paths to resources (jar files)
+   */
+  public static List<String> prepareFilesForStaging(
+      List<String> resourcesToStage, String tmpJarLocation) {
+    return resourcesToStage
+        .stream()
+        .map(File::new)
+        .filter(File::exists)
+        .map(
+            file ->
+                file.isDirectory()
+                    ? packageDirectoriesToStage(file, tmpJarLocation)
+                    : file.getAbsolutePath())
+        .collect(Collectors.toList());
+  }
+
+  private static String packageDirectoriesToStage(File directoryToStage, 
String tmpJarLocation) {
+    String hash = calculateDirectoryContentHash(directoryToStage);
+    String pathForJar = getUniqueJarPath(hash, tmpJarLocation);
+    zipDirectory(directoryToStage, pathForJar);
+    return pathForJar;
+  }
+
+  private static String calculateDirectoryContentHash(File directoryToStage) {
+    Hasher hasher = Hashing.md5().newHasher();
+    try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
+      ZipFiles.zipDirectory(directoryToStage, hashStream);
+      return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String getUniqueJarPath(String contentHash, String 
tmpJarLocation) {
+    checkArgument(
+        !Strings.isNullOrEmpty(tmpJarLocation),
+        "Please provide temporary location for storing the jar files.");
+
+    return String.format("%s%s.jar", tmpJarLocation, contentHash);
+  }
+
+  private static void zipDirectory(File directoryToStage, String 
uniqueDirectoryPath) {
+    try {
+      ZipFiles.zipDirectory(directoryToStage, new 
FileOutputStream(uniqueDirectoryPath));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
index cf33dc55ed4..1fa6d4314ab 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
@@ -17,12 +17,19 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -68,4 +75,43 @@ public void detectClassPathResourceWithNonFileResources() 
throws Exception {
 
     PipelineResources.detectClassPathResourcesToStage(classLoader);
   }
+
+  @Test
+  public void testRemovingNonexistentFilesFromFilesToStage() throws 
IOException {
+    String nonexistentFilePath = tmpFolder.getRoot().getPath() + 
"/nonexistent/file";
+    String existingFilePath = 
tmpFolder.newFile("existingFile").getAbsolutePath();
+    String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+    List<String> filesToStage = Arrays.asList(nonexistentFilePath, 
existingFilePath);
+    List<String> expectedFilesToStage = Arrays.asList(existingFilePath);
+
+    List<String> result = 
PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);
+
+    assertThat(result, is(expectedFilesToStage));
+  }
+
+  @Test
+  public void testPackagingDirectoryResourceToJarFile() throws IOException {
+    String directoryPath = tmpFolder.newFolder().getAbsolutePath();
+    String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+    List<String> filesToStage = new ArrayList<>();
+    filesToStage.add(directoryPath);
+
+    List<String> result = 
PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);
+
+    assertTrue(new File(result.get(0)).exists());
+    assertTrue(result.get(0).matches(".*\\.jar"));
+  }
+
+  @Test
+  public void testIfThrowsWhenThereIsNoTemporaryFolderForJars() throws 
IOException {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Please provide temporary location for storing the 
jar files.");
+
+    List<String> filesToStage = new ArrayList<>();
+    filesToStage.add(tmpFolder.newFolder().getAbsolutePath());
+
+    PipelineResources.prepareFilesForStaging(filesToStage, null);
+  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 282c9bd8618..a5bfe70363e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -17,22 +17,10 @@
  */
 package org.apache.beam.runners.flink;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.core.Base64Variants;
-import com.google.common.base.Strings;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.util.ZipFiles;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -95,73 +83,34 @@ public void translate(Pipeline pipeline) {
     pipeline.replaceAll(
         FlinkTransformOverrides.getDefaultOverrides(translationMode == 
TranslationMode.STREAMING));
 
-    // Local flink configurations work in the same JVM and have no problems 
with improperly
-    // formatted files on classpath (eg. directories with .class files or 
empty directories).
-    // prepareFilesToStage() only when using remote flink cluster.
-    List<String> filesToStage;
-    if (!options.getFlinkMaster().matches("\\[.*\\]")) {
-      filesToStage = prepareFilesToStage();
-    } else {
-      filesToStage = options.getFilesToStage();
-    }
+    prepareFilesToStageForRemoteClusterExecution(options);
 
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv =
-          FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
filesToStage);
+          FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, 
options);
     } else {
       this.flinkBatchEnv =
-          FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
filesToStage);
+          FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
 
     translator.translate(pipeline);
   }
 
-  private List<String> prepareFilesToStage() {
-    return options
-        .getFilesToStage()
-        .stream()
-        .map(File::new)
-        .filter(File::exists)
-        .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : 
file.getAbsolutePath())
-        .collect(Collectors.toList());
-  }
-
-  private String packageDirectoriesToStage(File directoryToStage) {
-    String hash = calculateDirectoryContentHash(directoryToStage);
-    String pathForJar = getUniqueJarPath(hash);
-    zipDirectory(directoryToStage, pathForJar);
-    return pathForJar;
-  }
-
-  private String calculateDirectoryContentHash(File directoryToStage) {
-    Hasher hasher = Hashing.md5().newHasher();
-    try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
-      ZipFiles.zipDirectory(directoryToStage, hashStream);
-      return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private String getUniqueJarPath(String contentHash) {
-    String tempLocation = options.getTempLocation();
-
-    checkArgument(
-        !Strings.isNullOrEmpty(tempLocation),
-        "Please provide \"tempLocation\" pipeline option. Flink runner needs 
it to store jars "
-            + "made of directories that were on classpath.");
-
-    return String.format("%s%s.jar", tempLocation, contentHash);
-  }
-
-  private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) 
{
-    try {
-      ZipFiles.zipDirectory(directoryToStage, new 
FileOutputStream(uniqueDirectoryPath));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+  /**
+   * Local configurations work in the same JVM and have no problems with 
improperly formatted files
+   * on classpath (eg. directories with .class files or empty directories). 
Prepare files for
+   * staging only when using remote cluster (passing the master address 
explicitly).
+   */
+  private static void 
prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions options) {
+    if 
(!options.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
+      options.setFilesToStage(
+          PipelineResources.prepareFilesForStaging(
+              options.getFilesToStage(), options.getTempLocation()));
     }
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 926a89d23a4..f7ea59c2ef6 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -17,7 +17,17 @@
  */
 package org.apache.beam.runners.flink;
 
+import static java.util.Arrays.asList;
+import static org.apache.beam.sdk.testing.RegexMatcher.matches;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Every.everyItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
@@ -27,7 +37,9 @@
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -35,6 +47,8 @@
 @RunWith(JUnit4.class)
 public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
 
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
   @Test
   public void shouldRecognizeAndTranslateStreamingPipeline() {
     FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
@@ -49,6 +63,7 @@ public void shouldRecognizeAndTranslateStreamingPipeline() {
         .apply(
             ParDo.of(
                 new DoFn<Long, String>() {
+
                   @ProcessElement
                   public void processElement(ProcessContext c) throws 
Exception {
                     c.output(Long.toString(c.element()));
@@ -61,4 +76,63 @@ public void processElement(ProcessContext c) throws 
Exception {
 
     // no exception should be thrown
   }
+
+  @Test
+  public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws 
IOException {
+    FlinkPipelineOptions options = 
testPreparingResourcesToStage("localhost:8081");
+
+    assertThat(options.getFilesToStage().size(), is(1));
+    assertThat(options.getFilesToStage().get(0), matches(".*\\.jar"));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws 
IOException {
+    FlinkPipelineOptions options = testPreparingResourcesToStage("[auto]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() 
throws IOException {
+    FlinkPipelineOptions options = 
testPreparingResourcesToStage("[collection]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  @Test
+  public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws 
IOException {
+    FlinkPipelineOptions options = testPreparingResourcesToStage("[local]");
+
+    assertThat(options.getFilesToStage().size(), is(2));
+    assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+  }
+
+  private FlinkPipelineOptions testPreparingResourcesToStage(String 
flinkMaster)
+      throws IOException {
+    Pipeline pipeline = Pipeline.create();
+    String tempLocation = tmpFolder.newFolder().getAbsolutePath();
+
+    File notEmptyDir = tmpFolder.newFolder();
+    notEmptyDir.createNewFile();
+    String notEmptyDirPath = notEmptyDir.getAbsolutePath();
+    String notExistingPath = "/path/to/not/existing/dir";
+
+    FlinkPipelineOptions options =
+        setPipelineOptions(flinkMaster, tempLocation, asList(notEmptyDirPath, 
notExistingPath));
+    FlinkPipelineExecutionEnvironment flinkEnv = new 
FlinkPipelineExecutionEnvironment(options);
+    flinkEnv.translate(pipeline);
+    return options;
+  }
+
+  private FlinkPipelineOptions setPipelineOptions(
+      String flinkMaster, String tempLocation, List<String> filesToStage) {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(TestFlinkRunner.class);
+    options.setFlinkMaster(flinkMaster);
+    options.setTempLocation(tempLocation);
+    options.setFilesToStage(filesToStage);
+    return options;
+  }
 }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 285ad481fcf..78299dc3a23 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
@@ -164,6 +165,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {
 
     
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(mOptions.isStreaming()));
 
+    prepareFilesToStageForRemoteClusterExecution(mOptions);
+
     if (mOptions.isStreaming()) {
       CheckpointDir checkpointDir = new 
CheckpointDir(mOptions.getCheckpointDir());
       SparkRunnerStreamingContextFactory streamingContextFactory =
@@ -272,6 +275,19 @@ private void detectTranslationMode(Pipeline pipeline) {
     }
   }
 
+  /**
+   * Local configurations work in the same JVM and have no problems with 
improperly formatted files
+   * on classpath (eg. directories with .class files or empty directories). 
Prepare files for
+   * staging only when using remote cluster (passing the master address 
explicitly).
+   */
+  private static void 
prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) {
+    if (!options.getSparkMaster().matches("local\\[?\\d*\\]?")) {
+      options.setFilesToStage(
+          PipelineResources.prepareFilesForStaging(
+              options.getFilesToStage(), options.getTempLocation()));
+    }
+  }
+
   /** Evaluator that update/populate the cache candidates. */
   public static void updateCacheCandidates(
       Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext 
evaluationContext) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 150328)
    Time Spent: 6h 10m  (was: 6h)

> Add ability to stage directories with compiled classes to Spark
> ---------------------------------------------------------------
>
>                 Key: BEAM-3371
>                 URL: https://issues.apache.org/jira/browse/BEAM-3371
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-spark
>            Reporter: Lukasz Gajowy
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> This one is basically the same issue as
>  [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except 
> of two things:
> - a detection of files to stage has to be provided in Spark, which is already 
> being developed [here|https://issues.apache.org/jira/browse/BEAM-981]
> - the test execution is not interrupted by FileNotFoundException but by *the 
> effect* of the directory not being staged (absence of test classes on the 
> Spark's classpath, hence ClassNotFoundException).
> Again, this probably could be resolved analogously as in flink, while 
> BEAM-981 issue is resolved. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to