This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cac59bc [BEAM-9660]: Add an explicit check for integer overflow.
new 05258c7 Merge pull request #11289 from spoortikundargi/patch-1
cac59bc is described below
commit cac59bc585d8c7eb71fe4a893b3b337c630287d4
Author: Spoorti Kundargi <[email protected]>
AuthorDate: Wed Apr 1 20:12:03 2020 -0700
[BEAM-9660]: Add an explicit check for integer overflow.
If `commitSize` is less than zero (due to overflow of integer serialized
size), the existing code was setting it to `Integer.MAX_VALUE` and using the
`estimatedCommitSize > byteLimit` check to throw an exception. However, in some
cases in Dataflow Streaming Applicance, `byteLimit` is set to
`Integer.MAX_VALUE` and so the check `estimatedCommitSize > byteLimit` fails to
detect integer overflow.
---
.../apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index f2b0b27..8d2b6a1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1376,7 +1376,7 @@ public class StreamingDataflowWorker {
// Detect overflow of integer serialized size or if the byte limit was
exceeded.
windmillMaxObservedWorkItemCommitBytes.addValue(estimatedCommitSize);
- if (estimatedCommitSize > byteLimit) {
+ if (commitSize < 0 || commitSize > byteLimit) {
KeyCommitTooLargeException e =
KeyCommitTooLargeException.causedBy(computationId, byteLimit,
commitRequest);
reportFailure(computationId, workItem, e);