[ https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=154119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154119 ]
ASF GitHub Bot logged work on BEAM-5708: ---------------------------------------- Author: ASF GitHub Bot Created on: 13/Oct/18 18:21 Start Date: 13/Oct/18 18:21 Worklog Time Spent: 10m Work Description: tweise closed pull request #6638: [BEAM-5708] Cache environment in portable flink runner URL: https://github.com/apache/beam/pull/6638 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 290f72399db..0ba980b27e5 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1484,6 +1484,7 @@ artifactId=${project.name} def beamTestPipelineOptions = [ "--runner=org.apache.beam.runners.reference.testing.TestPortableRunner", "--jobServerDriver=${config.jobServerDriver}", + "--environmentCacheMillis=10000", ] if (config.jobServerConfig) { beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}") diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java index 988a94826fb..bb2b9dcbe16 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java @@ -24,12 +24,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.sdk.fn.function.ThrowingFunction; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.ExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,9 +109,30 @@ private void scheduleRelease(JobInfo jobInfo) { WrappedContext wrapper = getCache().get(jobInfo.jobId()); Preconditions.checkState( wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId()); - // Do not release this asynchronously, as the releasing could fail due to the classloader not being - // available anymore after the tasks have been removed from the execution engine. - release(wrapper); + + PipelineOptions pipelineOptions = + PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); + int environmentCacheTTLMillis = + pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis(); + if (environmentCacheTTLMillis > 0) { + if (this.getClass().getClassLoader() != ExecutionEnvironment.class.getClassLoader()) { + LOG.warn( + "{} is not loaded on parent Flink classloader. " + + "Falling back to synchronous environment release for job {}.", + this.getClass(), + jobInfo.jobId()); + release(wrapper); + } else { + // Schedule task to clean the container later. + // Ensure that this class is loaded in the parent Flink classloader. + getExecutor() + .schedule(() -> release(wrapper), environmentCacheTTLMillis, TimeUnit.MILLISECONDS); + } + } else { + // Do not release this asynchronously, as the releasing could fail due to the classloader not + // being available anymore after the tasks have been removed from the execution engine. + release(wrapper); + } } private ConcurrentHashMap<String, WrappedContext> getCache() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index 5107c389bb1..a8dfa8e0665 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -87,4 +87,10 @@ String getSdkWorkerParallelism(); void setSdkWorkerParallelism(@Nullable String parallelism); + + @Description("Duration in milliseconds for environment cache within a job. 0 means no caching.") + @Default.Integer(0) + int getEnvironmentCacheMillis(); + + void setEnvironmentCacheMillis(int environmentCacheMillis); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 154119) Time Spent: 2h (was: 1h 50m) > Support caching of SDKHarness environments in flink > --------------------------------------------------- > > Key: BEAM-5708 > URL: https://issues.apache.org/jira/browse/BEAM-5708 > Project: Beam > Issue Type: Improvement > Components: runner-flink > Reporter: Ankur Goenka > Assignee: Ankur Goenka > Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Cache and reuse environment to improve performance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)