[ 
https://issues.apache.org/jira/browse/BEAM-4523?focusedWorklogId=110279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110279
 ]

ASF GitHub Bot logged work on BEAM-4523:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/18 21:09
            Start Date: 08/Jun/18 21:09
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5588: [BEAM-4523] Implement 
batch flink executable stage context
URL: https://github.com/apache/beam/pull/5588
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
index 3881a0b365a..74eae977191 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
@@ -24,6 +24,7 @@
 import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
 
+// TODO: Pull in https://github.com/apache/beam/pull/5359 once we use Flink 
1.6.
 /**
  * A pool of {@link ArtifactSource ArtifactSources} that can be used as a 
single source. At least
  * one source must be registered before artifacts can be requested from it.
@@ -39,6 +40,11 @@
 @ThreadSafe
 public class ArtifactSourcePool implements ArtifactSource {
 
+  /** Creates a new {@link ArtifactSourcePool}. */
+  public static ArtifactSourcePool create() {
+    return new ArtifactSourcePool();
+  }
+
   private ArtifactSourcePool() {}
 
   /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
new file mode 100644
index 00000000000..3e1cf1e68e0
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalNotification;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Implementation of a {@link FlinkExecutableStageContext} for batch jobs. */
+class BatchFlinkExecutableStageContext implements FlinkExecutableStageContext {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BatchFlinkExecutableStageContext.class);
+
+  private final JobBundleFactory jobBundleFactory;
+  private final ArtifactSourcePool artifactSourcePool;
+
+  private static BatchFlinkExecutableStageContext create(JobInfo jobInfo) 
throws Exception {
+    ArtifactSourcePool artifactSourcePool = ArtifactSourcePool.create();
+    JobBundleFactory jobBundleFactory = DockerJobBundleFactory.create(jobInfo, 
artifactSourcePool);
+    return new BatchFlinkExecutableStageContext(jobBundleFactory, 
artifactSourcePool);
+  }
+
+  private BatchFlinkExecutableStageContext(
+      JobBundleFactory jobBundleFactory, ArtifactSourcePool 
artifactSourcePool) {
+    this.jobBundleFactory = jobBundleFactory;
+    this.artifactSourcePool = artifactSourcePool;
+  }
+
+  @Override
+  public <InputT> StageBundleFactory getStageBundleFactory(ExecutableStage 
executableStage) {
+    return jobBundleFactory.<InputT>forStage(executableStage);
+  }
+
+  @Override
+  public StateRequestHandler getStateRequestHandler(
+      ExecutableStage executableStage, RuntimeContext runtimeContext) {
+    return FlinkBatchStateRequestHandler.forStage(executableStage, 
runtimeContext);
+  }
+
+  @Override
+  public ArtifactSourcePool getArtifactSourcePool() {
+    return artifactSourcePool;
+  }
+
+  private void cleanUp() throws Exception {
+    jobBundleFactory.close();
+  }
+
+  enum BatchFactory implements Factory {
+    INSTANCE;
+
+    private final LoadingCache<JobInfo, BatchFlinkExecutableStageContext> 
cachedContexts;
+
+    BatchFactory() {
+      cachedContexts =
+          CacheBuilder.newBuilder()
+              .weakValues()
+              .removalListener(
+                  (RemovalNotification<JobInfo, 
BatchFlinkExecutableStageContext> removal) -> {
+                    try {
+                      removal.getValue().cleanUp();
+                    } catch (Exception e) {
+                      LOG.warn(
+                          "Error cleaning up bundle factory for job " + 
removal.getKey().jobId(),
+                          e);
+                    }
+                  })
+              .build(
+                  new CacheLoader<JobInfo, BatchFlinkExecutableStageContext>() 
{
+                    @Override
+                    public BatchFlinkExecutableStageContext load(JobInfo 
jobInfo) throws Exception {
+                      return create(jobInfo);
+                    }
+                  });
+    }
+
+    @Override
+    public FlinkExecutableStageContext get(JobInfo jobInfo) {
+      return cachedContexts.getUnchecked(jobInfo);
+    }
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
new file mode 100644
index 00000000000..cdf06204c33
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse.Builder;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+// TODO: https://issues.apache.org/jira/browse/BEAM-4285 Implement batch state 
handler.
+/** State request handler for flink batch runner. */
+public class FlinkBatchStateRequestHandler implements StateRequestHandler {
+
+  private FlinkBatchStateRequestHandler() {}
+
+  public static FlinkBatchStateRequestHandler forStage(
+      ExecutableStage stage, RuntimeContext runtimeContext) {
+    return new FlinkBatchStateRequestHandler();
+  }
+
+  @Override
+  public CompletionStage<Builder> handle(StateRequest request) throws 
Exception {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index 0fd4961f699..d688b8ab80d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -26,30 +26,24 @@
 import org.apache.flink.api.common.functions.RuntimeContext;
 
 /** The Flink context required in order to execute {@link ExecutableStage 
stages}. */
-public class FlinkExecutableStageContext {
+public interface FlinkExecutableStageContext {
 
   /**
    * Creates {@link FlinkExecutableStageContext} instances. Serializable so 
that factories can be
    * defined at translation time and distributed to TaskManagers.
    */
-  public interface Factory extends Serializable {
+  interface Factory extends Serializable {
     FlinkExecutableStageContext get(JobInfo jobInfo);
   }
 
-  public static Factory batchFactory() {
-    return null;
+  static Factory batchFactory() {
+    return BatchFlinkExecutableStageContext.BatchFactory.INSTANCE;
   }
 
-  public StageBundleFactory getStageBundleFactory(ExecutableStage 
executableStage) {
-    throw new UnsupportedOperationException();
-  }
+  <InputT> StageBundleFactory<InputT> getStageBundleFactory(ExecutableStage 
executableStage);
 
-  public StateRequestHandler getStateRequestHandler(
-      ExecutableStage executableStage, RuntimeContext runtimeContext) {
-    throw new UnsupportedOperationException();
-  }
+  StateRequestHandler getStateRequestHandler(
+      ExecutableStage executableStage, RuntimeContext runtimeContext);
 
-  public ArtifactSourcePool getArtifactSourcePool() {
-    throw new UnsupportedOperationException();
-  }
+  ArtifactSourcePool getArtifactSourcePool();
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 3189fcd3d05..e276d372245 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -39,7 +39,6 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
-// TODO: https://issues.apache.org/jira/browse/BEAM-2597 Implement this 
executable stage operator.
 /**
  * Flink operator that passes its input DataSet through an SDK-executed {@link
  * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
@@ -65,7 +64,7 @@
   // Worker-local fields. These should only be constructed and consumed on 
Flink TaskManagers.
   private transient RuntimeContext runtimeContext;
   private transient StateRequestHandler stateRequestHandler;
-  private transient StageBundleFactory stageBundleFactory;
+  private transient StageBundleFactory<InputT> stageBundleFactory;
   private transient AutoCloseable distributedCacheCloser;
 
   public FlinkExecutableStageFunction(
@@ -85,10 +84,6 @@ public void open(Configuration parameters) throws Exception {
     runtimeContext = getRuntimeContext();
     // TODO: Wire this into the distributed cache and make it pluggable.
     ArtifactSource artifactSource = null;
-    // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
-    // It's a little strange because this operator is responsible for the 
lifetime of the stage
-    // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
-    // ownership of the higher level "factories" explicit? Do we care?
     FlinkExecutableStageContext stageContext = contextFactory.get(jobInfo);
     ArtifactSourcePool cachePool = stageContext.getArtifactSourcePool();
     distributedCacheCloser = cachePool.addToPool(artifactSource);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 95f4f1741ba..db7f7bc2e22 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -240,7 +240,7 @@ public void close() throws Exception {
           public void close() {}
         };
     // Wire the stage bundle factory into our context.
-    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+    
when(stageContext.<Void>getStageBundleFactory(any())).thenReturn(stageBundleFactory);
 
     ExecutableStageDoFnOperator<Integer, Integer> operator = 
getOperator(mainOutput,
             ImmutableList.of(additionalOutput1, additionalOutput2),
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 6613173233e..08fbcd3e99e 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -65,7 +65,7 @@
   @Mock private DistributedCache distributedCache;
   @Mock private Collector<RawUnionValue> collector;
   @Mock private FlinkExecutableStageContext stageContext;
-  @Mock private StageBundleFactory stageBundleFactory;
+  @Mock private StageBundleFactory<Integer> stageBundleFactory;
   @Mock private ArtifactSourcePool artifactSourcePool;
   @Mock private StateRequestHandler stateRequestHandler;
 
@@ -87,7 +87,7 @@ public void setUpMocks() {
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
     when(stageContext.getArtifactSourcePool()).thenReturn(artifactSourcePool);
     when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
-    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+    
when(stageContext.<Integer>getStageBundleFactory(any())).thenReturn(stageBundleFactory);
   }
 
   @Test
@@ -155,20 +155,22 @@ public void outputsAreTaggedCorrectly() throws Exception {
             "three", 3);
 
     // We use a real StageBundleFactory here in order to exercise the output 
receiver factory.
-    StageBundleFactory<Void> stageBundleFactory =
-        new StageBundleFactory<Void>() {
+    StageBundleFactory<Integer> stageBundleFactory =
+        new StageBundleFactory<Integer>() {
           @Override
-          public RemoteBundle<Void> getBundle(
+          public RemoteBundle<Integer> getBundle(
               OutputReceiverFactory receiverFactory, StateRequestHandler 
stateRequestHandler) {
-            return new RemoteBundle<Void>() {
+            return new RemoteBundle<Integer>() {
               @Override
               public String getId() {
                 return "bundle-id";
               }
 
               @Override
-              public FnDataReceiver<WindowedValue<Void>> getInputReceiver() {
-                return input -> {/* Ignore input*/};
+              public FnDataReceiver<WindowedValue<Integer>> getInputReceiver() 
{
+                return input -> {
+                  /* Ignore input*/
+                };
               }
 
               @Override
@@ -185,7 +187,7 @@ public void close() throws Exception {
           public void close() throws Exception {}
         };
     // Wire the stage bundle factory into our context.
-    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+    
when(stageContext.<Integer>getStageBundleFactory(any())).thenReturn(stageBundleFactory);
 
     FlinkExecutableStageFunction<Integer> function = getFunction(outputTagMap);
     function.open(new Configuration());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 110279)
    Time Spent: 1.5h  (was: 1h 20m)

> Implement Flink batch ExecutableStage context
> ---------------------------------------------
>
>                 Key: BEAM-4523
>                 URL: https://issues.apache.org/jira/browse/BEAM-4523
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The the ExecutableStage context is a wrapper for the job and stage bundle 
> factories and pooled artifact sources. It should take care of caching the 
> overall job bundle factory since we do not have access to job lifecycle hooks 
> in Flink but would like to reuse services and resources across operators 
> within a given job.
> FlinkExecutableStageContext already exists as a skeleton, but it needs to be 
> fleshed out.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to