Repository: incubator-beam Updated Branches: refs/heads/master 529bcdf56 -> 5d78420bf
Use a weakValues LoadingCache for serial TransformExecutorServices This allows the garbage collector to clean up references to TransformExecutorServices which are not currently in use. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f526374 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f526374 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f526374 Branch: refs/heads/master Commit: 6f526374fbc743a0d22e37ad0f746f0a695785dd Parents: ad58e26 Author: Thomas Groh <tg...@google.com> Authored: Tue Mar 29 10:56:10 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Apr 8 14:20:20 2016 -0700 ---------------------------------------------------------------------- .../ExecutorServiceParallelExecutor.java | 36 ++++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f526374/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 4d45e8f..c770735 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -31,6 +31,9 @@ import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -69,7 +72,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { private final InProcessEvaluationContext evaluationContext; - private final ConcurrentMap<StepAndKey, TransformExecutorService> currentEvaluations; + private final LoadingCache<StepAndKey, TransformExecutorService> executorServices; private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors; private final Queue<ExecutorUpdate> allUpdates; @@ -107,8 +110,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { this.transformEnforcements = transformEnforcements; this.evaluationContext = context; - currentEvaluations = new ConcurrentHashMap<>(); scheduledExecutors = new ConcurrentHashMap<>(); + // Weak Values allows TransformExecutorServices that are no longer in use to be reclaimed. + // Executing TransformExecutorServices have a strong reference to their TransformExecutorService + // which stops the TransformExecutorServices from being prematurely garbage collected + executorServices = + CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); this.allUpdates = new ConcurrentLinkedQueue<>(); this.visibleUpdates = new ArrayBlockingQueue<>(20); @@ -118,6 +125,16 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { defaultCompletionCallback = new DefaultCompletionCallback(); } + private CacheLoader<StepAndKey, TransformExecutorService> + serialTransformExecutorServiceCacheLoader() { + return new CacheLoader<StepAndKey, TransformExecutorService>() { + @Override + public TransformExecutorService load(StepAndKey stepAndKey) throws Exception { + return TransformExecutorServices.serial(executorService, scheduledExecutors); + } + }; + } + @Override public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { rootNodes = ImmutableList.copyOf(roots); @@ -142,7 +159,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { if (bundle != null && isKeyed(bundle.getPCollection())) { final StepAndKey stepAndKey = StepAndKey.of(transform, bundle == null ? null : bundle.getKey()); - transformExecutor = getSerialExecutorService(stepAndKey); + // This executor will remain reachable until it has executed all scheduled transforms. + // The TransformExecutors keep a strong reference to the Executor, the ExecutorService keeps + // a reference to the scheduled TransformExecutor callable. Follow-up TransformExecutors + // (scheduled due to the completion of another TransformExecutor) are provided to the + // ExecutorService before the Earlier TransformExecutor callable completes. + transformExecutor = executorServices.getUnchecked(stepAndKey); } else { transformExecutor = parallelExecutorService; } @@ -174,14 +196,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } } - private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) { - if (!currentEvaluations.containsKey(stepAndKey)) { - currentEvaluations.putIfAbsent( - stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors)); - } - return currentEvaluations.get(stepAndKey); - } - @Override public void awaitCompletion() throws Throwable { VisibleExecutorUpdate update;