arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3376670805
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -555,7 +666,92 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
// RestrictionTracker.getProgress() or GetSize() are not defined.
outputBuilder.setSourceBacklogBytes(backlogBytes);
}
- return callbacks;
+
+ this.accumulatedCallbacks.putAll(callbacks);
+
+ outputBuilder.setSourceBytesProcessed(
+ computeSourceBytesProcessed(sourceBytesProcessCounterName));
+ }
+
+ private final long computeSourceBytesProcessed(String
sourceBytesCounterName) {
+ if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+ return 0L;
+ }
+ HashMap<String, ElementCounter> counters =
+ ((DataflowMapTaskExecutor) workExecutor)
+ .getReadOperation()
+ .receivers[0]
+ .getOutputCounters();
+
+ return Optional.ofNullable(counters.get(sourceBytesCounterName))
+ .map(counter -> ((OutputObjectAndByteCounter)
counter).getByteCount().getAndReset())
+ .orElse(0L);
+ }
+
+ public Map<Long, Pair<Instant, Runnable>> flushState() {
+ return accumulatedCallbacks;
+ }
+
+ public boolean advance() {
+ return false;
+ }
+
+ private void startForNewKey(Work newWork, WindmillStateReader reader) {
+ if (keySwitchListener != null && this.work != null && this.work !=
newWork) {
+ keySwitchListener.onKeySwitch(this.work, newWork);
+ }
+ this.key = decodeKey(newWork);
+ this.work = newWork;
+ this.finishKeyCalled = false;
+ this.computationKey = WindmillComputationKey.create(computationId,
newWork.getShardedKey());
+
+ this.outputBuilder = createOutputBuilder(newWork);
+ this.outputBuilders.add(this.outputBuilder);
+ newWork.setOnFailureListener(() -> this.workIsFailed = true);
+ this.executedWorks.add(newWork);
+
+ logHotKeyIfDetected(newWork, this.key);
+
+ // Note: We do NOT clear sideInputCache here, allowing Key B to reuse warm
side inputs!
+
+ // Re-initialize state cache and state/timer internals across all step
contexts
+ Instant processingTime =
+
computeProcessingTime(newWork.getWorkItem().getTimers().getTimersList());
+ if (!getAllStepContexts().isEmpty()) {
+ // This must be only created once for a workItem as token validation
will fail if the same
Review Comment:
ActiveWorkState manages the retries and multiple work items for same key. It
make sures there is only one work item active at a time for a key. I think we
can rely on that and don't need to add deduping logic in the context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]