[
https://issues.apache.org/jira/browse/BEAM-5636?focusedWorklogId=154370&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154370
]
ASF GitHub Bot logged work on BEAM-5636:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/Oct/18 16:46
Start Date: 15/Oct/18 16:46
Worklog Time Spent: 10m
Work Description: herohde closed pull request #6665: [BEAM-5636] Java
support for custom dataflow worker jar
URL: https://github.com/apache/beam/pull/6665
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 54c39f02fb1..15740650017 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -741,6 +741,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.getStager().stageToFile(serializedProtoPipeline,
PIPELINE_FILE_NAME);
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
+ if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar())) {
+ List<String> experiments =
+ dataflowOptions.getExperiments() == null
+ ? new ArrayList<>()
+ : dataflowOptions.getExperiments();
+ experiments.add("use_staged_dataflow_worker_jar");
+ dataflowOptions.setExperiments(experiments);
+ }
+
Job newJob = jobSpecification.getJob();
try {
newJob
@@ -791,6 +800,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
minCpuFlags.get(0));
}
}
+
newJob.getEnvironment().setExperiments(experiments);
// Set the Docker container image that executes Dataflow worker harness,
residing in Google
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 43e80c290bc..6670e9e911b 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -136,6 +136,11 @@
void setPipelineUrl(String urlString);
+ @Description("The customized dataflow worker jar")
+ String getDataflowWorkerJar();
+
+ void setDataflowWorkerJar(String dataflowWorkerJar);
+
/** Returns a default staging location under {@link
GcpOptions#getGcpTempLocation}. */
class StagingLocationFactory implements DefaultValueFactory<String> {
private static final Logger LOG =
LoggerFactory.getLogger(StagingLocationFactory.class);
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 7ed78e83baa..0487b5d07a4 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -54,13 +54,17 @@ public static GcsStager fromOptions(PipelineOptions
options) {
checkNotNull(options.getStagingLocation());
String windmillBinary =
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
-
+ String dataflowWorkerJar = options.getDataflowWorkerJar();
List<String> filesToStage = options.getFilesToStage();
if (windmillBinary != null) {
filesToStage.add("windmill_main=" + windmillBinary);
}
+ if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty()) {
+ filesToStage.add("dataflow-worker.jar=" + dataflowWorkerJar);
+ }
+
return stageFiles(filesToStage);
}
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 6a5b9a1bbd5..96915668860 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -103,7 +103,17 @@ func main() {
filepath.Join(jarsDir, "slf4j-jdk14.jar"),
filepath.Join(jarsDir, "beam-sdks-java-harness.jar"),
}
+
+ var hasWorkerExperiment = strings.Contains(options,
"use_staged_dataflow_worker_jar")
for _, md := range artifacts {
+ if hasWorkerExperiment {
+ if strings.HasPrefix(md.Name,
"beam-runners-google-cloud-dataflow-java-fn-api-worker") {
+ continue
+ }
+ if md.Name == "dataflow-worker.jar" {
+ continue
+ }
+ }
cp = append(cp, filepath.Join(dir, filepath.FromSlash(md.Name)))
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 154370)
Time Spent: 1.5h (was: 1h 20m)
> Java support for custom dataflow worker jar
> -------------------------------------------
>
> Key: BEAM-5636
> URL: https://issues.apache.org/jira/browse/BEAM-5636
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Henning Rohde
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> One of the slightly subtle aspects is that we would need to ignore one of the
> staged jars for portable Java jobs. That requires a change to the Java boot
> code:
> https://github.com/apache/beam/blob/66d7c865b7267f388ee60752891a9141fad43774/sdks/java/container/boot.go#L107
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)