[ 
https://issues.apache.org/jira/browse/BEAM-3370?focusedWorklogId=134125&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134125
 ]

ASF GitHub Bot logged work on BEAM-3370:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Aug/18 11:44
            Start Date: 13/Aug/18 11:44
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #6179: [BEAM-3370 & 
BEAM-3359] Enable running IOIT on flink
URL: https://github.com/apache/beam/pull/6179
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index f3b5fb53134..ca1786a0583 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1060,6 +1060,10 @@ artifactId=${project.name}
           testCompile it.project(path: ":beam-runners-direct-java", 
configuration: 'shadowTest')
         }
 
+        if (runner?.equalsIgnoreCase('flink')) {
+          testCompile it.project(path: ":beam-runners-flink_2.11", 
configuration: 'shadowTest')
+        }
+
         /* include dependencies required by filesystems */
         if (filesystem?.equalsIgnoreCase('hdfs')) {
           testCompile it.project(path: 
":beam-sdks-java-io-hadoop-file-system", configuration: 'shadowTest')
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 84876604cd0..d985e354f35 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -17,9 +17,22 @@
  */
 package org.apache.beam.runners.flink;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.fasterxml.jackson.core.Base64Variants;
+import com.google.common.base.Strings;
+import com.google.common.hash.Funnels;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.util.ZipFiles;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -82,22 +95,76 @@ public void translate(FlinkRunner flinkRunner, Pipeline 
pipeline) {
     pipeline.replaceAll(
         FlinkTransformOverrides.getDefaultOverrides(translationMode == 
TranslationMode.STREAMING));
 
+    // Local flink configurations work in the same JVM and have no problems 
with improperly
+    // formatted files on classpath (eg. directories with .class files or 
empty directories).
+    // prepareFilesToStage() only when using remote flink cluster.
+    List<String> filesToStage;
+    if (!options.getFlinkMaster().matches("\\[.*\\]")) {
+      filesToStage = prepareFilesToStage();
+    } else {
+      filesToStage = options.getFilesToStage();
+    }
+
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv =
-          FlinkExecutionEnvironments.createStreamExecutionEnvironment(
-              options, options.getFilesToStage());
+          FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
filesToStage);
       translator = new FlinkStreamingPipelineTranslator(flinkRunner, 
flinkStreamEnv, options);
     } else {
       this.flinkBatchEnv =
-          FlinkExecutionEnvironments.createBatchExecutionEnvironment(
-              options, options.getFilesToStage());
+          FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
filesToStage);
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
 
     translator.translate(pipeline);
   }
 
+  private List<String> prepareFilesToStage() {
+    return options
+        .getFilesToStage()
+        .stream()
+        .map(File::new)
+        .filter(File::exists)
+        .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : 
file.getAbsolutePath())
+        .collect(Collectors.toList());
+  }
+
+  private String packageDirectoriesToStage(File directoryToStage) {
+    String hash = calculateDirectoryContentHash(directoryToStage);
+    String pathForJar = getUniqueJarPath(hash);
+    zipDirectory(directoryToStage, pathForJar);
+    return pathForJar;
+  }
+
+  private String calculateDirectoryContentHash(File directoryToStage) {
+    Hasher hasher = Hashing.md5().newHasher();
+    try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
+      ZipFiles.zipDirectory(directoryToStage, hashStream);
+      return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String getUniqueJarPath(String contentHash) {
+    String tempLocation = options.getTempLocation();
+
+    checkArgument(
+        !Strings.isNullOrEmpty(tempLocation),
+        "Please provide \"tempLocation\" pipeline option. Flink runner needs 
it to store jars "
+            + "made of directories that were on classpath.");
+
+    return String.format("%s%s.jar", tempLocation, contentHash);
+  }
+
+  private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) 
{
+    try {
+      ZipFiles.zipDirectory(directoryToStage, new 
FileOutputStream(uniqueDirectoryPath));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /** Launches the program execution. */
   public JobExecutionResult executePipeline() throws Exception {
     final String jobName = options.getJobName();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index a14436efab7..c3f7415354f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -32,8 +32,6 @@
   private FlinkRunner delegate;
 
   private TestFlinkRunner(FlinkPipelineOptions options) {
-    // We use [auto] for testing since this will make it pick up the Testing 
ExecutionEnvironment
-    options.setFlinkMaster("[auto]");
     options.setShutdownSourcesOnFinalWatermark(true);
     this.delegate = FlinkRunner.fromOptions(options);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134125)
    Time Spent: 1h 50m  (was: 1h 40m)

> Add ability to stage directories with compiled classes to Flink
> ---------------------------------------------------------------
>
>                 Key: BEAM-3370
>                 URL: https://issues.apache.org/jira/browse/BEAM-3370
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Lukasz Gajowy
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently, when _filesToStage_ contain a path to directory with resources, 
> flink runner throws a {{"java.io.FileNotFoundException: <path_to_the_dir> (Is 
> a directory)"}}. A way to include directory resources would be helpful.
> This "blocker" occurs while trying to run IOITs on flink runner, which 
> basically makes it impossible/very inconvenient to run. When the tests are 
> run via "mvn verify" command, a "test-classes" *directory* gets detected by 
> detectClasspathResourcesToStage() method which in turn causes the above error.
> One way to solve this issue is to package the directories to jars with unique 
> names and update the paths accordingly before staging the files on flink. 
> Something similar is already done in the Dataflow runner 
> ([GcsStager|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java#L74]),
>  more specifically in 
> [PackageUtil|https://github.com/apache/beam/blob/cd186a531aaff0b21cf009b034e1a41f7e7b64af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L280]
>  class. We are able to run the tests on dataflow thanks to that.
> As I checked in a [small experiment of 
> mine|https://github.com/lgajowy/beam/commits/spark-and-flink-run-tests], 
> providing analogous change makes it possible to run the tests on a Flink 
> cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to