arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371051915
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -555,7 +666,92 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
// RestrictionTracker.getProgress() or GetSize() are not defined.
outputBuilder.setSourceBacklogBytes(backlogBytes);
}
- return callbacks;
+
+ this.accumulatedCallbacks.putAll(callbacks);
+
+ outputBuilder.setSourceBytesProcessed(
+ computeSourceBytesProcessed(sourceBytesProcessCounterName));
+ }
+
+ private final long computeSourceBytesProcessed(String
sourceBytesCounterName) {
+ if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+ return 0L;
+ }
+ HashMap<String, ElementCounter> counters =
+ ((DataflowMapTaskExecutor) workExecutor)
+ .getReadOperation()
+ .receivers[0]
+ .getOutputCounters();
+
+ return Optional.ofNullable(counters.get(sourceBytesCounterName))
+ .map(counter -> ((OutputObjectAndByteCounter)
counter).getByteCount().getAndReset())
+ .orElse(0L);
+ }
+
+ public Map<Long, Pair<Instant, Runnable>> flushState() {
+ return accumulatedCallbacks;
+ }
+
+ public boolean advance() {
+ return false;
+ }
+
+ private void startForNewKey(Work newWork, WindmillStateReader reader) {
+ if (keySwitchListener != null && this.work != null && this.work !=
newWork) {
+ keySwitchListener.onKeySwitch(this.work, newWork);
+ }
+ this.key = decodeKey(newWork);
+ this.work = newWork;
+ this.finishKeyCalled = false;
+ this.computationKey = WindmillComputationKey.create(computationId,
newWork.getShardedKey());
+
+ this.outputBuilder = createOutputBuilder(newWork);
+ this.outputBuilders.add(this.outputBuilder);
+ newWork.setOnFailureListener(() -> this.workIsFailed = true);
+ this.executedWorks.add(newWork);
+
+ logHotKeyIfDetected(newWork, this.key);
+
+ // Note: We do NOT clear sideInputCache here, allowing Key B to reuse warm
side inputs!
+
+ // Re-initialize state cache and state/timer internals across all step
contexts
+ Instant processingTime =
+
computeProcessingTime(newWork.getWorkItem().getTimers().getTimersList());
+ if (!getAllStepContexts().isEmpty()) {
+ // This must be only created once for a workItem as token validation
will fail if the same
+ // work token is reused.
+ WindmillStateCache.ForKey cacheForKey =
+ stateCache.forKey(
+ getComputationKey(), newWork.getWorkItem().getCacheToken(),
getWorkToken());
+ this.activeStateReader = reader;
+ startStepContexts(reader, processingTime, cacheForKey,
newWork.watermarks());
+ } else {
+ this.activeStateReader = null;
+ }
+ }
+
+ public List<Work> getExecutedWorks() {
+ return executedWorks;
+ }
+
+ public long getStateBytesRead() {
+ return stateBytesRead;
+ }
+
+ public List<Windmill.WorkItemCommitRequest.Builder> getOutputBuilders() {
Review Comment:
Updated to return fully built commits and not the builders.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java:
##########
@@ -57,22 +57,35 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
- throw new
WorkItemCancelledException(context.getWorkItem().getShardingKey());
+ throw new
WorkItemCancelledException(checkNotNull(context.getWorkItem()).getShardingKey());
}
while (true) {
if (bundleIndex >= work.getMessageBundlesCount()) {
- current = null;
+ // If elements are exhausted, try advancing the execution context to
the next key in the
+ // group
context.finishKey();
+ if (context.advance()) {
+ // Transition succeeded! Update iterator references to the new work
item
+ this.work = context.getWork().getWorkItem();
+ this.bundleIndex = 0;
+ this.messageIndex = -1;
+ continue;
+ }
+
+ // All work items are exhausted. Iterator returns false.
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java:
##########
@@ -151,51 +151,65 @@ public
NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
final WindowedValue<KeyedWorkItem<K, T>> value = new
ValueInEmptyWindows<>(keyedWorkItem);
- // Return a noop iterator when current workitem is an empty workitem.
- if (isEmptyWorkItem) {
- return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
- @Override
- public boolean start() throws IOException {
- context.finishKey();
- return false;
+ return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
+ private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
+ private boolean started = false;
+
+ @Override
+ public boolean start() throws IOException {
+ if (context.workIsFailed()) {
+ throw new WorkItemCancelledException(
+ checkStateNotNull(context.getWorkItem()).getShardingKey());
}
-
- @Override
- public boolean advance() throws IOException {
+ if (started) {
return false;
}
-
- @Override
- public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
- throw new NoSuchElementException();
+ started = true;
+ if (isEmptyWorkItem) {
+ return advance(); // Try to transition immediately if the first key
is empty!
}
- };
- } else {
- return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
- private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
-
- @Override
- public boolean start() throws IOException {
- current = value;
- return true;
+ current = value;
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (context.workIsFailed()) {
+ throw new WorkItemCancelledException(
+ checkStateNotNull(context.getWorkItem()).getShardingKey());
}
- @Override
- public boolean advance() throws IOException {
- current = null;
- context.finishKey();
- return false;
+ context.finishKey();
+ if (context.advance()) {
+ @SuppressWarnings("unchecked")
+ K newKey = (K) context.getKey();
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]