scwhittle commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3372601333
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -79,6 +82,8 @@ public final class Work implements RefreshableWork {
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";
+ private final AtomicReference<@Nullable AtomicBoolean> onFailureListener =
Review Comment:
can this be combined with isFailed? when we construct the work if we know
what group it is going to be part of we could assign the shared AtomicBoolean
at that point.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
Review Comment:
move this next to other failure methods.
Do we want this to reflect the shared-state? It seems like if we are
processing a group of keys and are processing key C but key A which was
previously processed is marked as failed due to heartbeats then we'd want to
stop processing key C. I think we'd need to return true here to notice that.
If that's true, perhaps we can get rid of the separate isFailed and failure
listener and just have a listener? it could be volatile AtomicBoolean perhaps
and then if the listener is set it merges and replaces it's state, and if set
externally to null it creates a new atomicbool.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java:
##########
@@ -57,22 +57,35 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
- throw new
WorkItemCancelledException(context.getWorkItem().getShardingKey());
+ throw new
WorkItemCancelledException(checkNotNull(context.getWorkItem()).getShardingKey());
}
while (true) {
if (bundleIndex >= work.getMessageBundlesCount()) {
- current = null;
+ // If elements are exhausted, try advancing the execution context to
the next key in the
+ // group
context.finishKey();
+ if (context.advance()) {
+ // Transition succeeded! Update iterator references to the new work
item
+ this.work = context.getWork().getWorkItem();
Review Comment:
how about a resetWork method that you call here and also use in the
constructor? Then it will be harder to add one spot and not the other.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java:
##########
Review Comment:
Sounds good, might be a nice simplification to follow up with.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -555,7 +666,92 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
// RestrictionTracker.getProgress() or GetSize() are not defined.
outputBuilder.setSourceBacklogBytes(backlogBytes);
}
- return callbacks;
+
+ this.accumulatedCallbacks.putAll(callbacks);
+
+ outputBuilder.setSourceBytesProcessed(
+ computeSourceBytesProcessed(sourceBytesProcessCounterName));
+ }
+
+ private final long computeSourceBytesProcessed(String
sourceBytesCounterName) {
+ if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+ return 0L;
+ }
+ HashMap<String, ElementCounter> counters =
+ ((DataflowMapTaskExecutor) workExecutor)
+ .getReadOperation()
+ .receivers[0]
+ .getOutputCounters();
+
+ return Optional.ofNullable(counters.get(sourceBytesCounterName))
+ .map(counter -> ((OutputObjectAndByteCounter)
counter).getByteCount().getAndReset())
+ .orElse(0L);
+ }
+
+ public Map<Long, Pair<Instant, Runnable>> flushState() {
+ return accumulatedCallbacks;
+ }
+
+ public boolean advance() {
+ return false;
+ }
+
+ private void startForNewKey(Work newWork, WindmillStateReader reader) {
+ if (keySwitchListener != null && this.work != null && this.work !=
newWork) {
+ keySwitchListener.onKeySwitch(this.work, newWork);
+ }
+ this.key = decodeKey(newWork);
+ this.work = newWork;
+ this.finishKeyCalled = false;
+ this.computationKey = WindmillComputationKey.create(computationId,
newWork.getShardedKey());
+
+ this.outputBuilder = createOutputBuilder(newWork);
+ this.outputBuilders.add(this.outputBuilder);
+ newWork.setOnFailureListener(() -> this.workIsFailed = true);
+ this.executedWorks.add(newWork);
+
+ logHotKeyIfDetected(newWork, this.key);
+
+ // Note: We do NOT clear sideInputCache here, allowing Key B to reuse warm
side inputs!
+
+ // Re-initialize state cache and state/timer internals across all step
contexts
+ Instant processingTime =
+
computeProcessingTime(newWork.getWorkItem().getTimers().getTimersList());
+ if (!getAllStepContexts().isEmpty()) {
+ // This must be only created once for a workItem as token validation
will fail if the same
Review Comment:
Separate from the logic here, I was wondering if we want/need to try to
prevent processing a batch with the same key multiple times
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]