[
https://issues.apache.org/jira/browse/BEAM-8554?focusedWorklogId=342964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342964
]
ASF GitHub Bot logged work on BEAM-8554:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Nov/19 22:13
Start Date: 13/Nov/19 22:13
Worklog Time Spent: 10m
Work Description: stevekoonce commented on pull request #10013:
[BEAM-8554] Use WorkItemCommitRequest protobuf fields to signal that …
URL: https://github.com/apache/beam/pull/10013#discussion_r346030994
##########
File path:
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1330,20 +1334,19 @@ private void process(
WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
int commitSize = commitRequest.getSerializedSize();
+ int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE :
commitSize;
+
// Detect overflow of integer serialized size or if the byte limit was
exceeded.
- windmillMaxObservedWorkItemCommitBytes.addValue(
- commitSize < 0 ? Integer.MAX_VALUE : commitSize);
- if (commitSize < 0) {
- throw KeyCommitTooLargeException.causedBy(computationId, byteLimit,
commitRequest);
- } else if (commitSize > byteLimit) {
- // Once supported, we should communicate the desired truncation for
the commit to the
- // streaming engine. For now we report the error but attempt the
commit so that it will be
- // truncated by the streaming engine backend.
+ windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
+ if (estimatedCommitSize > byteLimit) {
KeyCommitTooLargeException e =
KeyCommitTooLargeException.causedBy(computationId, byteLimit,
commitRequest);
reportFailure(computationId, workItem, e);
LOG.error(e.toString());
+
+ commitRequest = buildWorkItemTruncationRequest(key, workItem,
estimatedCommitSize);
Review comment:
All of the 'doomed' messages, timers, counters, etc. are being cleared as
well. It would be a little more efficient to do that explicitly rather than
just start over, but less future-proof as new fields are added to the proto and
need to be cleared out here as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 342964)
Time Spent: 3h 20m (was: 3h 10m)
> Use WorkItemCommitRequest protobuf fields to signal that a WorkItem needs to
> be broken up
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-8554
> URL: https://issues.apache.org/jira/browse/BEAM-8554
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Steve Koonce
> Priority: Minor
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> +Background:+
> When a WorkItemCommitRequest is generated that's bigger than the permitted
> size (> ~180 MB), a KeyCommitTooLargeException is logged (_not thrown_) and
> the request is still sent to the service. The service rejects the commit,
> but breaks up input messages that were bundled together and adds them to new,
> smaller work items that will later be pulled and re-tried - likely without
> generating another commit that is too large.
> When a WorkItemCommitRequest is generated that's too large to be sent back to
> the service (> 2 GB), a KeyCommitTooLargeException is thrown and nothing is
> sent back to the service.
>
> +Proposed Improvement+
> In both cases, prevent the doomed, large commit item from being sent back to
> the service. Instead send flags in the commit request signaling that the
> current work item led to a commit that is too large and the work item should
> be broken up.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)