[ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=154119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154119
 ]

ASF GitHub Bot logged work on BEAM-5708:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Oct/18 18:21
            Start Date: 13/Oct/18 18:21
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6638: [BEAM-5708] Cache 
environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 290f72399db..0ba980b27e5 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1484,6 +1484,7 @@ artifactId=${project.name}
       def beamTestPipelineOptions = [
         
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
         "--jobServerDriver=${config.jobServerDriver}",
+        "--environmentCacheMillis=10000",
       ]
       if (config.jobServerConfig) {
         
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 988a94826fb..bb2b9dcbe16 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -24,12 +24,17 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,9 +109,30 @@ private void scheduleRelease(JobInfo jobInfo) {
     WrappedContext wrapper = getCache().get(jobInfo.jobId());
     Preconditions.checkState(
         wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-    // Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-    // available anymore after the tasks have been removed from the execution 
engine.
-    release(wrapper);
+
+    PipelineOptions pipelineOptions =
+        PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+    int environmentCacheTTLMillis =
+        
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+    if (environmentCacheTTLMillis > 0) {
+      if (this.getClass().getClassLoader() != 
ExecutionEnvironment.class.getClassLoader()) {
+        LOG.warn(
+            "{} is not loaded on parent Flink classloader. "
+                + "Falling back to synchronous environment release for job 
{}.",
+            this.getClass(),
+            jobInfo.jobId());
+        release(wrapper);
+      } else {
+        // Schedule task to clean the container later.
+        // Ensure that this class is loaded in the parent Flink classloader.
+        getExecutor()
+            .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
+      }
+    } else {
+      // Do not release this asynchronously, as the releasing could fail due 
to the classloader not
+      // being available anymore after the tasks have been removed from the 
execution engine.
+      release(wrapper);
+    }
   }
 
   private ConcurrentHashMap<String, WrappedContext> getCache() {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index 5107c389bb1..a8dfa8e0665 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -87,4 +87,10 @@
   String getSdkWorkerParallelism();
 
   void setSdkWorkerParallelism(@Nullable String parallelism);
+
+  @Description("Duration in milliseconds for environment cache within a job. 0 
means no caching.")
+  @Default.Integer(0)
+  int getEnvironmentCacheMillis();
+
+  void setEnvironmentCacheMillis(int environmentCacheMillis);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 154119)
    Time Spent: 2h  (was: 1h 50m)

> Support caching of SDKHarness environments in flink
> ---------------------------------------------------
>
>                 Key: BEAM-5708
>                 URL: https://issues.apache.org/jira/browse/BEAM-5708
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to