[
https://issues.apache.org/jira/browse/BEAM-5605?focusedWorklogId=370221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-370221
]
ASF GitHub Bot logged work on BEAM-5605:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jan/20 01:49
Start Date: 11/Jan/20 01:49
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #10501: [BEAM-5605]
Add support for channel splitting to the gRPC read "source" and propagate
"split" calls to the downstream receiver
URL: https://github.com/apache/beam/pull/10501#discussion_r365488343
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -170,7 +181,109 @@ public void registerInputLocation() {
apiServiceDescriptor,
LogicalEndpoint.of(processBundleInstructionIdSupplier.get(),
pTransformId),
coder,
- consumer);
+ this::forwardElementToConsumer);
+ }
+
+ public void forwardElementToConsumer(WindowedValue<OutputT> element) throws
Exception {
+ synchronized (splittingLock) {
+ if (index == stopIndex - 1) {
+ return;
+ }
+ index += 1;
+ }
+ consumer.accept(element);
+ }
+
+ public void split(
+ ProcessBundleSplitRequest request, ProcessBundleSplitResponse.Builder
response) {
+ DesiredSplit desiredSplit =
request.getDesiredSplitsMap().get(pTransformId);
+ if (desiredSplit == null) {
+ return;
+ }
+
+ long totalBufferSize = desiredSplit.getEstimatedInputElements();
+
+ HandlesSplits splittingConsumer = null;
+ if (consumer instanceof HandlesSplits) {
+ splittingConsumer = ((HandlesSplits) consumer);
+ }
+
+ synchronized (splittingLock) {
+ // Since we hold the splittingLock, we guarantee that we will not pass
the next element
+ // to the downstream consumer. We still have a race where the downstream
consumer may
+ // have yet to see the element or has completed processing the element
by the time
+ // we ask it to split (even after we have asked for its progress).
+
+ // If the split request we received was delayed and is less then the
known number of elements
+ // then use "index + 1" as the total size. Similarly, if we have already
split and the
+ // split request is bounded incorrectly, use the stop index as the upper
bound.
+ if (totalBufferSize < index + 1) {
+ totalBufferSize = index + 1;
+ } else if (totalBufferSize > stopIndex) {
+ totalBufferSize = stopIndex;
+ }
+
+ // In the case where we have yet to process an element, set the current
element progress to 1.
+ double currentElementProgress = 1;
+
+ // If we have started processing at least one element, attempt to get
the downstream
+ // progress defaulting to 0.5 if no progress was able to get fetched.
+ if (index >= 0) {
+ if (splittingConsumer != null) {
+ currentElementProgress = splittingConsumer.getProgress();
+ } else {
+ currentElementProgress = 0.5;
+ }
+ }
+
+ checkArgument(
+ desiredSplit.getAllowedSplitPointsList().isEmpty(),
+ "TODO: BEAM-3836, support split point restrictions.");
+
+ // Now figure out where to split.
+ //
+ // The units here (except for keepOfElementRemainder) are all in terms
of number or
+ // (possibly fractional) elements.
+
+ // Compute the amount of "remaining" work that we know of.
+ double remainder = totalBufferSize - index - currentElementProgress;
+ // Compute the fraction of work that we should "keep".
Review comment:
Compute the number of elements that we should "keep."
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 370221)
Time Spent: 5h 40m (was: 5.5h)
> Support Portable SplittableDoFn for batch
> -----------------------------------------
>
> Key: BEAM-5605
> URL: https://issues.apache.org/jira/browse/BEAM-5605
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Scott Wegner
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Time Spent: 5h 40m
> Remaining Estimate: 0h
>
> Roll-up item tracking work towards supporting portable SplittableDoFn for
> batch
--
This message was sent by Atlassian Jira
(v8.3.4#803005)