[
https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=150328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150328
]
ASF GitHub Bot logged work on BEAM-3371:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/18 09:56
Start Date: 02/Oct/18 09:56
Worklog Time Spent: 10m
Work Description: aromanenko-dev closed pull request #6244: [BEAM-3371]
Enable running integration tests on Spark
URL: https://github.com/apache/beam/pull/6244
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5b4154d957d..454eb025c05 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1109,6 +1109,17 @@ artifactId=${project.name}
testCompile it.project(path: ":beam-runners-flink_2.11",
configuration: 'shadowTest')
}
+ if (runner?.equalsIgnoreCase('spark')) {
+ testCompile it.project(path: ":beam-runners-spark", configuration:
'shadowTest')
+ testCompile project.library.java.spark_core
+ testCompile project.library.java.spark_streaming
+
+ // Testing the Spark runner causes a StackOverflowError if
slf4j-jdk14 is on the classpath
+ project.configurations.testRuntimeClasspath {
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+ }
+ }
+
/* include dependencies required by filesystems */
if (filesystem?.equalsIgnoreCase('hdfs')) {
testCompile it.project(path:
":beam-sdks-java-io-hadoop-file-system", configuration: 'shadowTest')
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
index f718b70bcf3..c982e62a330 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -17,12 +17,24 @@
*/
package org.apache.beam.runners.core.construction;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.core.Base64Variants;
+import com.google.common.base.Strings;
+import com.google.common.hash.Funnels;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.util.ZipFiles;
/** Utilities for working with classpath resources for pipelines. */
public class PipelineResources {
@@ -57,4 +69,60 @@
}
return files;
}
+
+ /**
+ * Goes through the list of files that need to be staged on runner. Removes
nonexistent
+ * directories and packages existing ones. This is necessary for runners
that require filesToStage
+ * to be jars only.
+ *
+ * @param resourcesToStage list of resources that need to be staged
+ * @param tmpJarLocation temporary directory to store the jars
+ * @return A list of absolute paths to resources (jar files)
+ */
+ public static List<String> prepareFilesForStaging(
+ List<String> resourcesToStage, String tmpJarLocation) {
+ return resourcesToStage
+ .stream()
+ .map(File::new)
+ .filter(File::exists)
+ .map(
+ file ->
+ file.isDirectory()
+ ? packageDirectoriesToStage(file, tmpJarLocation)
+ : file.getAbsolutePath())
+ .collect(Collectors.toList());
+ }
+
+ private static String packageDirectoriesToStage(File directoryToStage,
String tmpJarLocation) {
+ String hash = calculateDirectoryContentHash(directoryToStage);
+ String pathForJar = getUniqueJarPath(hash, tmpJarLocation);
+ zipDirectory(directoryToStage, pathForJar);
+ return pathForJar;
+ }
+
+ private static String calculateDirectoryContentHash(File directoryToStage) {
+ Hasher hasher = Hashing.md5().newHasher();
+ try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
+ ZipFiles.zipDirectory(directoryToStage, hashStream);
+ return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String getUniqueJarPath(String contentHash, String
tmpJarLocation) {
+ checkArgument(
+ !Strings.isNullOrEmpty(tmpJarLocation),
+ "Please provide temporary location for storing the jar files.");
+
+ return String.format("%s%s.jar", tmpJarLocation, contentHash);
+ }
+
+ private static void zipDirectory(File directoryToStage, String
uniqueDirectoryPath) {
+ try {
+ ZipFiles.zipDirectory(directoryToStage, new
FileOutputStream(uniqueDirectoryPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
index cf33dc55ed4..1fa6d4314ab 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
@@ -17,12 +17,19 @@
*/
package org.apache.beam.runners.core.construction;
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -68,4 +75,43 @@ public void detectClassPathResourceWithNonFileResources()
throws Exception {
PipelineResources.detectClassPathResourcesToStage(classLoader);
}
+
+ @Test
+ public void testRemovingNonexistentFilesFromFilesToStage() throws
IOException {
+ String nonexistentFilePath = tmpFolder.getRoot().getPath() +
"/nonexistent/file";
+ String existingFilePath =
tmpFolder.newFile("existingFile").getAbsolutePath();
+ String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+ List<String> filesToStage = Arrays.asList(nonexistentFilePath,
existingFilePath);
+ List<String> expectedFilesToStage = Arrays.asList(existingFilePath);
+
+ List<String> result =
PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);
+
+ assertThat(result, is(expectedFilesToStage));
+ }
+
+ @Test
+ public void testPackagingDirectoryResourceToJarFile() throws IOException {
+ String directoryPath = tmpFolder.newFolder().getAbsolutePath();
+ String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();
+
+ List<String> filesToStage = new ArrayList<>();
+ filesToStage.add(directoryPath);
+
+ List<String> result =
PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);
+
+ assertTrue(new File(result.get(0)).exists());
+ assertTrue(result.get(0).matches(".*\\.jar"));
+ }
+
+ @Test
+ public void testIfThrowsWhenThereIsNoTemporaryFolderForJars() throws
IOException {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Please provide temporary location for storing the
jar files.");
+
+ List<String> filesToStage = new ArrayList<>();
+ filesToStage.add(tmpFolder.newFolder().getAbsolutePath());
+
+ PipelineResources.prepareFilesForStaging(filesToStage, null);
+ }
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 282c9bd8618..a5bfe70363e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -17,22 +17,10 @@
*/
package org.apache.beam.runners.flink;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.core.Base64Variants;
-import com.google.common.base.Strings;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.util.ZipFiles;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -95,73 +83,34 @@ public void translate(Pipeline pipeline) {
pipeline.replaceAll(
FlinkTransformOverrides.getDefaultOverrides(translationMode ==
TranslationMode.STREAMING));
- // Local flink configurations work in the same JVM and have no problems
with improperly
- // formatted files on classpath (eg. directories with .class files or
empty directories).
- // prepareFilesToStage() only when using remote flink cluster.
- List<String> filesToStage;
- if (!options.getFlinkMaster().matches("\\[.*\\]")) {
- filesToStage = prepareFilesToStage();
- } else {
- filesToStage = options.getFilesToStage();
- }
+ prepareFilesToStageForRemoteClusterExecution(options);
FlinkPipelineTranslator translator;
if (translationMode == TranslationMode.STREAMING) {
this.flinkStreamEnv =
- FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
filesToStage);
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, options.getFilesToStage());
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv,
options);
} else {
this.flinkBatchEnv =
- FlinkExecutionEnvironments.createBatchExecutionEnvironment(options,
filesToStage);
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, options.getFilesToStage());
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
}
translator.translate(pipeline);
}
- private List<String> prepareFilesToStage() {
- return options
- .getFilesToStage()
- .stream()
- .map(File::new)
- .filter(File::exists)
- .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) :
file.getAbsolutePath())
- .collect(Collectors.toList());
- }
-
- private String packageDirectoriesToStage(File directoryToStage) {
- String hash = calculateDirectoryContentHash(directoryToStage);
- String pathForJar = getUniqueJarPath(hash);
- zipDirectory(directoryToStage, pathForJar);
- return pathForJar;
- }
-
- private String calculateDirectoryContentHash(File directoryToStage) {
- Hasher hasher = Hashing.md5().newHasher();
- try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
- ZipFiles.zipDirectory(directoryToStage, hashStream);
- return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private String getUniqueJarPath(String contentHash) {
- String tempLocation = options.getTempLocation();
-
- checkArgument(
- !Strings.isNullOrEmpty(tempLocation),
- "Please provide \"tempLocation\" pipeline option. Flink runner needs
it to store jars "
- + "made of directories that were on classpath.");
-
- return String.format("%s%s.jar", tempLocation, contentHash);
- }
-
- private void zipDirectory(File directoryToStage, String uniqueDirectoryPath)
{
- try {
- ZipFiles.zipDirectory(directoryToStage, new
FileOutputStream(uniqueDirectoryPath));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ /**
+ * Local configurations work in the same JVM and have no problems with
improperly formatted files
+ * on classpath (eg. directories with .class files or empty directories).
Prepare files for
+ * staging only when using remote cluster (passing the master address
explicitly).
+ */
+ private static void
prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions options) {
+ if
(!options.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
+ options.setFilesToStage(
+ PipelineResources.prepareFilesForStaging(
+ options.getFilesToStage(), options.getTempLocation()));
}
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 926a89d23a4..f7ea59c2ef6 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -17,7 +17,17 @@
*/
package org.apache.beam.runners.flink;
+import static java.util.Arrays.asList;
+import static org.apache.beam.sdk.testing.RegexMatcher.matches;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Every.everyItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
@@ -27,7 +37,9 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,6 +47,8 @@
@RunWith(JUnit4.class)
public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
@Test
public void shouldRecognizeAndTranslateStreamingPipeline() {
FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
@@ -49,6 +63,7 @@ public void shouldRecognizeAndTranslateStreamingPipeline() {
.apply(
ParDo.of(
new DoFn<Long, String>() {
+
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
c.output(Long.toString(c.element()));
@@ -61,4 +76,63 @@ public void processElement(ProcessContext c) throws
Exception {
// no exception should be thrown
}
+
+ @Test
+ public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws
IOException {
+ FlinkPipelineOptions options =
testPreparingResourcesToStage("localhost:8081");
+
+ assertThat(options.getFilesToStage().size(), is(1));
+ assertThat(options.getFilesToStage().get(0), matches(".*\\.jar"));
+ }
+
+ @Test
+ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws
IOException {
+ FlinkPipelineOptions options = testPreparingResourcesToStage("[auto]");
+
+ assertThat(options.getFilesToStage().size(), is(2));
+ assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+ }
+
+ @Test
+ public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection()
throws IOException {
+ FlinkPipelineOptions options =
testPreparingResourcesToStage("[collection]");
+
+ assertThat(options.getFilesToStage().size(), is(2));
+ assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+ }
+
+ @Test
+ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws
IOException {
+ FlinkPipelineOptions options = testPreparingResourcesToStage("[local]");
+
+ assertThat(options.getFilesToStage().size(), is(2));
+ assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
+ }
+
+ private FlinkPipelineOptions testPreparingResourcesToStage(String
flinkMaster)
+ throws IOException {
+ Pipeline pipeline = Pipeline.create();
+ String tempLocation = tmpFolder.newFolder().getAbsolutePath();
+
+ File notEmptyDir = tmpFolder.newFolder();
+ notEmptyDir.createNewFile();
+ String notEmptyDirPath = notEmptyDir.getAbsolutePath();
+ String notExistingPath = "/path/to/not/existing/dir";
+
+ FlinkPipelineOptions options =
+ setPipelineOptions(flinkMaster, tempLocation, asList(notEmptyDirPath,
notExistingPath));
+ FlinkPipelineExecutionEnvironment flinkEnv = new
FlinkPipelineExecutionEnvironment(options);
+ flinkEnv.translate(pipeline);
+ return options;
+ }
+
+ private FlinkPipelineOptions setPipelineOptions(
+ String flinkMaster, String tempLocation, List<String> filesToStage) {
+ FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(TestFlinkRunner.class);
+ options.setFlinkMaster(flinkMaster);
+ options.setTempLocation(tempLocation);
+ options.setFilesToStage(filesToStage);
+ return options;
+ }
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 285ad481fcf..78299dc3a23 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
@@ -164,6 +165,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {
pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(mOptions.isStreaming()));
+ prepareFilesToStageForRemoteClusterExecution(mOptions);
+
if (mOptions.isStreaming()) {
CheckpointDir checkpointDir = new
CheckpointDir(mOptions.getCheckpointDir());
SparkRunnerStreamingContextFactory streamingContextFactory =
@@ -272,6 +275,19 @@ private void detectTranslationMode(Pipeline pipeline) {
}
}
+ /**
+ * Local configurations work in the same JVM and have no problems with
improperly formatted files
+ * on classpath (eg. directories with .class files or empty directories).
Prepare files for
+ * staging only when using remote cluster (passing the master address
explicitly).
+ */
+ private static void
prepareFilesToStageForRemoteClusterExecution(SparkPipelineOptions options) {
+ if (!options.getSparkMaster().matches("local\\[?\\d*\\]?")) {
+ options.setFilesToStage(
+ PipelineResources.prepareFilesForStaging(
+ options.getFilesToStage(), options.getTempLocation()));
+ }
+ }
+
/** Evaluator that update/populate the cache candidates. */
public static void updateCacheCandidates(
Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext
evaluationContext) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 150328)
Time Spent: 6h 10m (was: 6h)
> Add ability to stage directories with compiled classes to Spark
> ---------------------------------------------------------------
>
> Key: BEAM-3371
> URL: https://issues.apache.org/jira/browse/BEAM-3371
> Project: Beam
> Issue Type: Sub-task
> Components: runner-spark
> Reporter: Lukasz Gajowy
> Assignee: Jean-Baptiste Onofré
> Priority: Minor
> Time Spent: 6h 10m
> Remaining Estimate: 0h
>
> This one is basically the same issue as
> [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except
> of two things:
> - a detection of files to stage has to be provided in Spark, which is already
> being developed [here|https://issues.apache.org/jira/browse/BEAM-981]
> - the test execution is not interrupted by FileNotFoundException but by *the
> effect* of the directory not being staged (absence of test classes on the
> Spark's classpath, hence ClassNotFoundException).
> Again, this probably could be resolved analogously as in flink, while
> BEAM-981 issue is resolved.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)