[
https://issues.apache.org/jira/browse/BEAM-5288?focusedWorklogId=147338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-147338
]
ASF GitHub Bot logged work on BEAM-5288:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Sep/18 22:04
Start Date: 24/Sep/18 22:04
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #6441:
[BEAM-5288] Support environment pipeline option in Java and Python.
URL: https://github.com/apache/beam/pull/6441#discussion_r220002094
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
##########
@@ -17,18 +17,34 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.LazyJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
+import
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
/** Implementation of a {@link FlinkExecutableStageContext}. */
class FlinkDefaultExecutableStageContext implements
FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;
private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo)
throws Exception {
- JobBundleFactory jobBundleFactory =
DockerJobBundleFactory.FACTORY.get().create(jobInfo);
+ JobBundleFactory jobBundleFactory =
+ LazyJobBundleFactory.create(
+ jobInfo,
+ ImmutableMap.of(
+ BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+ new DockerEnvironmentFactory.Provider(),
+ BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
+ new ProcessEnvironmentFactory.Provider(),
+ Environments.ENVIRONMENT_INPROCESS, // Non Public urn for
testing.
Review comment:
And it should be a public URN.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 147338)
Time Spent: 7h 40m (was: 7.5h)
> Modify Environment to support non-dockerized SDK harness deployments
> ---------------------------------------------------------------------
>
> Key: BEAM-5288
> URL: https://issues.apache.org/jira/browse/BEAM-5288
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Maximilian Michels
> Assignee: Ankur Goenka
> Priority: Major
> Time Spent: 7h 40m
> Remaining Estimate: 0h
>
> As of mailing discussions and BEAM-5187, it has become clear that we need to
> extend the Environment information. In addition to the Docker environment,
> the extended environment holds deployment options for 1) a process-based
> environment, 2) an externally managed environment.
> The proto definition, as of now, looks as follows:
> {noformat}
> message Environment {
> // (Required) The URN of the payload
> string urn = 1;
> // (Optional) The data specifying any parameters to the URN. If
> // the URN does not require any arguments, this may be omitted.
> bytes payload = 2;
> }
> message StandardEnvironments {
> enum Environments {
> DOCKER = 0 [(beam_urn) = "beam:env:docker:v1"];
> PROCESS = 1 [(beam_urn) = "beam:env:process:v1"];
> EXTERNAL = 2 [(beam_urn) = "beam:env:external:v1"];
> }
> }
> // The payload of a Docker image
> message DockerPayload {
> string container_image = 1; // implicitly linux_amd64.
> }
> message ProcessPayload {
> string os = 1; // "linux", "darwin", ..
> string arch = 2; // "amd64", ..
> string command = 3; // process to execute
> map<string, string> env = 4; // environment variables
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)