This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5494f114382 [Dataflow Streaming] Invalidate caches and remove work on
failure before commit (#30229)
5494f114382 is described below
commit 5494f1143827e0e6fec9e331b93c00c83d10c66e
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Feb 6 03:34:48 2024 -0800
[Dataflow Streaming] Invalidate caches and remove work on failure before
commit (#30229)
* Invalidate caches and remove work on failure before commit
* Prevent completeWorkAndScheduleNextWorkForKey from throwing
---------
Co-authored-by: Arun Pandian <[email protected]>
---
.../dataflow/worker/StreamingDataflowWorker.java | 15 +++++++++++----
.../dataflow/worker/streaming/ActiveWorkState.java | 17 +++++++----------
2 files changed, 18 insertions(+), 14 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 3ba27bd852f..14efdcc5eb0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,12 +1397,21 @@ public class StreamingDataflowWorker {
// Adds the commit to the commitStream if it fits, returning true iff it is
consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream
commitStream) {
Preconditions.checkNotNull(commit);
+ final ComputationState state = commit.computationState();
+ final Windmill.WorkItemCommitRequest request = commit.request();
// Drop commits for failed work. Such commits will be dropped by Windmill
anyway.
if (commit.work().isFailed()) {
+ readerCache.invalidateReader(
+ WindmillComputationKey.create(
+ state.getComputationId(), request.getKey(),
request.getShardingKey()));
+ stateCache
+ .forComputation(state.getComputationId())
+ .invalidate(request.getKey(), request.getShardingKey());
+ state.completeWorkAndScheduleNextWorkForKey(
+ ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
return true;
}
- final ComputationState state = commit.computationState();
- final Windmill.WorkItemCommitRequest request = commit.request();
+
final int size = commit.getSize();
commit.work().setState(Work.State.COMMITTING);
activeCommitBytes.addAndGet(size);
@@ -1419,8 +1428,6 @@ public class StreamingDataflowWorker {
.invalidate(request.getKey(), request.getShardingKey());
}
activeCommitBytes.addAndGet(-size);
- // This may throw an exception if the commit was not active, which
is possible if it
- // was deemed stuck.
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index 54942dfeee1..ff46356d956 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -188,16 +188,13 @@ public final class ActiveWorkState {
private synchronized void removeCompletedWorkFromQueue(
Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
- // avoid Preconditions.checkState here to prevent eagerly evaluating the
- // format string parameters for the error message.
- Work completedWork =
- Optional.ofNullable(workQueue.peek())
- .orElseThrow(
- () ->
- new IllegalStateException(
- String.format(
- "Active key %s without work, expected token %d",
- shardedKey, workToken)));
+ Work completedWork = workQueue.peek();
+ if (completedWork == null) {
+ // Work may have been completed due to clearing of stuck commits.
+ LOG.warn(
+ String.format("Active key %s without work, expected token %d",
shardedKey, workToken));
+ return;
+ }
if (completedWork.getWorkItem().getWorkToken() != workToken) {
// Work may have been completed due to clearing of stuck commits.