[
https://issues.apache.org/jira/browse/BEAM-8619?focusedWorklogId=347835&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347835
]
ASF GitHub Bot logged work on BEAM-8619:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Nov/19 00:37
Start Date: 22/Nov/19 00:37
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #10126: [BEAM-8619]
Tear down the DoFns upon the control service termination …
URL: https://github.com/apache/beam/pull/10126#discussion_r349383237
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -363,16 +392,43 @@ private BundleProcessor
createBundleProcessor(BeamFnApi.InstructionRequest reque
tearDownFunctions::add,
splitListener);
}
- return BundleProcessor.create(
- startFunctionRegistry,
- finishFunctionRegistry,
- tearDownFunctions,
- allResiduals,
- pCollectionConsumerRegistry,
- metricsContainerRegistry,
- stateTracker,
- beamFnStateClient,
- queueingClient);
+ return bundleProcessor;
+ }
+
+ /** A cache for {@link BundleProcessor}s. */
+ private static class BundleProcessorCache {
+
+ private final Map<String, ConcurrentLinkedQueue<BundleProcessor>>
cachedBundleProcessors;
+
+ BundleProcessorCache() {
+ this.cachedBundleProcessors = Maps.newConcurrentMap();
+ }
+
+ /**
+ * Get a {@link BundleProcessor} from the cache if it's available.
Otherwise, create one using
+ * the specified bundleProcessorSupplier.
+ */
+ BundleProcessor get(
+ String bundleDescriptorId, Supplier<BundleProcessor>
bundleProcessorSupplier) {
+ ConcurrentLinkedQueue<BundleProcessor> bundleProcessors =
+ cachedBundleProcessors.computeIfAbsent(
+ bundleDescriptorId, descriptorId -> new
ConcurrentLinkedQueue<>());
+ BundleProcessor bundleProcessor = bundleProcessors.poll();
+ if (bundleProcessor != null) {
+ return bundleProcessor;
+ }
+
+ return bundleProcessorSupplier.get();
+ }
+
+ /**
+ * Add a {@link BundleProcessor} to cache. The {@link BundleProcessor}
will be reset before
+ * added to the cache.
Review comment:
`added to the cache.` -> `being added to the cache.`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 347835)
Time Spent: 1.5h (was: 1h 20m)
> Tear down the DoFns upon the control service termination in Java SDK harness
> ----------------------------------------------------------------------------
>
> Key: BEAM-8619
> URL: https://issues.apache.org/jira/browse/BEAM-8619
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Affects Versions: 2.18.0
> Reporter: sunjincheng
> Assignee: sunjincheng
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Per the discussion in the ML, the detail can be found [1], the teardown of
> DoFns should be supported in the portability framework. It happens at two
> places:
> 1) Upon the control service termination
> 2) Tear down the unused DoFns periodically
> The aim of this JIRA is to add support for teardown the DoFns upon the
> control service termination in Java SDK harness.
> [1]
> https://lists.apache.org/thread.html/0c4a4cf83cf2e35c3dfeb9d906e26cd82d3820968ba6f862f91739e4@%3Cdev.beam.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)