This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 23dcb7ec1d5 Revert "When failing work items during commit, make sure
to call completeWork…" (#30228)
23dcb7ec1d5 is described below
commit 23dcb7ec1d539759f5c587a6dfee357ad250db72
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Feb 6 11:23:57 2024 +0100
Revert "When failing work items during commit, make sure to call
completeWork…" (#30228)
This reverts commit b0f2eebb0244302ac2315dc260536512d229401f.
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 12 ++----------
1 file changed, 2 insertions(+), 10 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index b48032677ff..3ba27bd852f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,20 +1397,12 @@ public class StreamingDataflowWorker {
// Adds the commit to the commitStream if it fits, returning true iff it is
consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream
commitStream) {
Preconditions.checkNotNull(commit);
- final ComputationState state = commit.computationState();
- final Windmill.WorkItemCommitRequest request = commit.request();
// Drop commits for failed work. Such commits will be dropped by Windmill
anyway.
if (commit.work().isFailed()) {
- readerCache.invalidateReader(
- WindmillComputationKey.create(
- state.getComputationId(), request.getKey(),
request.getShardingKey()));
- stateCache
- .forComputation(state.getComputationId())
- .invalidate(request.getKey(), request.getShardingKey());
- state.completeWorkAndScheduleNextWorkForKey(
- ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
return true;
}
+ final ComputationState state = commit.computationState();
+ final Windmill.WorkItemCommitRequest request = commit.request();
final int size = commit.getSize();
commit.work().setState(Work.State.COMMITTING);
activeCommitBytes.addAndGet(size);