[
https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=145652&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145652
]
ASF GitHub Bot logged work on BEAM-3371:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/18 13:13
Start Date: 19/Sep/18 13:13
Worklog Time Spent: 10m
Work Description: mxm commented on a change in pull request #6244:
[BEAM-3371] Enable running integration tests on Spark
URL: https://github.com/apache/beam/pull/6244#discussion_r218785917
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
##########
@@ -95,73 +83,33 @@ public void translate(Pipeline pipeline) {
pipeline.replaceAll(
FlinkTransformOverrides.getDefaultOverrides(translationMode ==
TranslationMode.STREAMING));
- // Local flink configurations work in the same JVM and have no problems
with improperly
- // formatted files on classpath (eg. directories with .class files or
empty directories).
- // prepareFilesToStage() only when using remote flink cluster.
- List<String> filesToStage;
- if (!options.getFlinkMaster().matches("\\[.*\\]")) {
- filesToStage = prepareFilesToStage();
- } else {
- filesToStage = options.getFilesToStage();
- }
+ prepareFilesToStageForRemoteClusterExecution();
FlinkPipelineTranslator translator;
if (translationMode == TranslationMode.STREAMING) {
this.flinkStreamEnv =
- FlinkExecutionEnvironments.createStreamExecutionEnvironment(options,
filesToStage);
+ FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+ options, options.getFilesToStage());
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv,
options);
} else {
this.flinkBatchEnv =
- FlinkExecutionEnvironments.createBatchExecutionEnvironment(options,
filesToStage);
+ FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+ options, options.getFilesToStage());
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
}
translator.translate(pipeline);
}
- private List<String> prepareFilesToStage() {
- return options
- .getFilesToStage()
- .stream()
- .map(File::new)
- .filter(File::exists)
- .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) :
file.getAbsolutePath())
- .collect(Collectors.toList());
- }
-
- private String packageDirectoriesToStage(File directoryToStage) {
- String hash = calculateDirectoryContentHash(directoryToStage);
- String pathForJar = getUniqueJarPath(hash);
- zipDirectory(directoryToStage, pathForJar);
- return pathForJar;
- }
-
- private String calculateDirectoryContentHash(File directoryToStage) {
- Hasher hasher = Hashing.md5().newHasher();
- try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
- ZipFiles.zipDirectory(directoryToStage, hashStream);
- return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private String getUniqueJarPath(String contentHash) {
- String tempLocation = options.getTempLocation();
-
- checkArgument(
- !Strings.isNullOrEmpty(tempLocation),
- "Please provide \"tempLocation\" pipeline option. Flink runner needs
it to store jars "
- + "made of directories that were on classpath.");
-
- return String.format("%s%s.jar", tempLocation, contentHash);
- }
-
- private void zipDirectory(File directoryToStage, String uniqueDirectoryPath)
{
- try {
- ZipFiles.zipDirectory(directoryToStage, new
FileOutputStream(uniqueDirectoryPath));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ /**
+ * Local configurations work in the same JVM and have no problems with
improperly formatted files
+ * on classpath (eg. directories with .class files or empty directories).
Prepare files for
+ * staging only when using remote cluster.
+ */
+ private void prepareFilesToStageForRemoteClusterExecution() {
+ if (!options.getFlinkMaster().matches("\\[.*\\]")) {
Review comment:
flinkMaster is set to `[auto]` before *test* pipeline execution, if unset.
With #6426 (not merged) it always defaults to `[auto]`.
`[auto]` can be used when a) testing locally b) submitting using the flink
CLI, where the job jar file will already be staged and no additional files need
to be staged. So this shouldn't break anything.
Whenever we change code like this we should ideally add tests (which you did
for the new class). Additional tests for the staging in
`FlinkPipelineExecutionEnvironmentTest` would be nice, but we can also do it in
a follow-up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145652)
Time Spent: 3h 20m (was: 3h 10m)
> Add ability to stage directories with compiled classes to Spark
> ---------------------------------------------------------------
>
> Key: BEAM-3371
> URL: https://issues.apache.org/jira/browse/BEAM-3371
> Project: Beam
> Issue Type: Sub-task
> Components: runner-spark
> Reporter: Lukasz Gajowy
> Assignee: Jean-Baptiste Onofré
> Priority: Minor
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> This one is basically the same issue as
> [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except
> of two things:
> - a detection of files to stage has to be provided in Spark, which is already
> being developed [here|https://issues.apache.org/jira/browse/BEAM-981]
> - the test execution is not interrupted by FileNotFoundException but by *the
> effect* of the directory not being staged (absence of test classes on the
> Spark's classpath, hence ClassNotFoundException).
> Again, this probably could be resolved analogously as in flink, while
> BEAM-981 issue is resolved.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)