[
https://issues.apache.org/jira/browse/BEAM-5605?focusedWorklogId=370936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-370936
]
ASF GitHub Bot logged work on BEAM-5605:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jan/20 17:32
Start Date: 13/Jan/20 17:32
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #10501: [BEAM-5605]
Add support for channel splitting to the gRPC read "source" and propagate
"split" calls to the downstream receiver
URL: https://github.com/apache/beam/pull/10501#discussion_r365932235
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -170,7 +181,109 @@ public void registerInputLocation() {
apiServiceDescriptor,
LogicalEndpoint.of(processBundleInstructionIdSupplier.get(),
pTransformId),
coder,
- consumer);
+ this::forwardElementToConsumer);
+ }
+
+ public void forwardElementToConsumer(WindowedValue<OutputT> element) throws
Exception {
+ synchronized (splittingLock) {
+ if (index == stopIndex - 1) {
+ return;
+ }
+ index += 1;
+ }
+ consumer.accept(element);
+ }
+
+ public void split(
+ ProcessBundleSplitRequest request, ProcessBundleSplitResponse.Builder
response) {
+ DesiredSplit desiredSplit =
request.getDesiredSplitsMap().get(pTransformId);
+ if (desiredSplit == null) {
+ return;
+ }
+
+ long totalBufferSize = desiredSplit.getEstimatedInputElements();
+
+ HandlesSplits splittingConsumer = null;
+ if (consumer instanceof HandlesSplits) {
+ splittingConsumer = ((HandlesSplits) consumer);
+ }
+
+ synchronized (splittingLock) {
+ // Since we hold the splittingLock, we guarantee that we will not pass
the next element
+ // to the downstream consumer. We still have a race where the downstream
consumer may
+ // have yet to see the element or has completed processing the element
by the time
+ // we ask it to split (even after we have asked for its progress).
+
+ // If the split request we received was delayed and is less then the
known number of elements
+ // then use "index + 1" as the total size. Similarly, if we have already
split and the
+ // split request is bounded incorrectly, use the stop index as the upper
bound.
+ if (totalBufferSize < index + 1) {
+ totalBufferSize = index + 1;
+ } else if (totalBufferSize > stopIndex) {
+ totalBufferSize = stopIndex;
+ }
+
+ // In the case where we have yet to process an element, set the current
element progress to 1.
Review comment:
Its logically the same where the else clause is the default based upon what
we initialize.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 370936)
Time Spent: 5h 50m (was: 5h 40m)
> Support Portable SplittableDoFn for batch
> -----------------------------------------
>
> Key: BEAM-5605
> URL: https://issues.apache.org/jira/browse/BEAM-5605
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Scott Wegner
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> Roll-up item tracking work towards supporting portable SplittableDoFn for
> batch
--
This message was sent by Atlassian Jira
(v8.3.4#803005)