arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371044425


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -388,86 +372,143 @@ private ExecuteWorkResult executeWork(
       SideInputStateFetcher localSideInputStateFetcher =
           
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
 
-      // If the read output KVs, then we can decode Windmill's byte key into 
userland
-      // key object and provide it to the execution context for use with 
per-key state.
-      // Otherwise, we pass null.
-      //
-      // The coder type that will be present is:
-      //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
-      Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
-      @SuppressWarnings("deprecation")
-      @Nullable
-      final Object executionKey =
-          !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(), 
Coder.Context.OUTER);
-
-      if (workItem.hasHotKeyInfo()) {
-        Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
-        Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 
1000);
-
-        String stepName = 
getShuffleTaskStepName(computationState.getMapTask());
-        if (executionKey != null
-            && (options.isHotKeyLoggingEnabled()
-                || hasExperiment(options, "enable_hot_key_logging"))
-            && keyCoder.isPresent()) {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
-        } else {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
-        }
-      }
+      StreamingModeExecutionContext.KeySwitchListener keySwitchListener =
+          createKeySwitchListener(computationState);
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
+          work, stateReader, localSideInputStateFetcher, workExecutor, handle, 
keySwitchListener);
 
-      if (work.isFailed()) {
-        throw new WorkItemCancelledException(workItem.getShardingKey());
+      StreamingModeExecutionContext context = 
computationWorkExecutor.context();
+      if (context.workIsFailed()) {
+        throw new 
WorkItemCancelledException(work.getWorkItem().getShardingKey());
       }
 
-      // Reports source bytes processed to WorkItemCommitRequest if available.
-      try {
-        long sourceBytesProcessed =
-            computationWorkExecutor.computeSourceBytesProcessed(
-                computationState.sourceBytesProcessCounterName());
-        outputBuilder.setSourceBytesProcessed(sourceBytesProcessed);
-      } catch (Exception e) {
-        LOG.error("{}", e.toString());
-      }
-
-      
commitFinalizer.cacheCommitFinalizers(computationWorkExecutor.context().flushState());
+      // Retrieve executed works, output builders, and accumulated callbacks 
from execution context
+      List<Work> workBatch = context.getExecutedWorks();
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders = 
context.getOutputBuilders();
+      Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks = 
context.getAccumulatedCallbacks();
 
+      context.clear();
       // Release the execution state for another thread to use.
       computationState.releaseComputationWorkExecutor(computationWorkExecutor);
       computationWorkExecutor = null;
 
-      work.setState(Work.State.COMMIT_QUEUED);
-      
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
-
       return ExecuteWorkResult.create(
-          outputBuilder, stateReader.getBytesRead() + 
localSideInputStateFetcher.getBytesRead());
+          workBatch,
+          outputBuilders,
+          accumulatedCallbacks,
+          context.getStateBytesRead() + 
localSideInputStateFetcher.getBytesRead());
     } catch (Throwable t) {
       if (computationWorkExecutor != null) {
         // If processing failed due to a thrown exception, close the 
executionState. Do not
         // return/release the executionState back to computationState as that 
will lead to this
         // executionState instance being reused.
-        LOG.debug("Invalidating executor after work item {} failed", 
workItem.getWorkToken(), t);
+        LOG.debug(
+            "Invalidating executor after work item {} failed",
+            work.getWorkItem().getWorkToken(),
+            t);
         computationWorkExecutor.invalidate();
       }
-
       // Re-throw the exception, it will be caught and handled by 
workFailureProcessor downstream.
       throw t;
     }
   }
 
+  private void handleOnlyFinalize(
+      ComputationState computationState, Work work, Windmill.WorkItem 
workItem) {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder =
+        initializeOutputBuilder(workItem.getKey(), workItem);
+    
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
+    work.setState(Work.State.COMMIT_QUEUED);
+    work.queueCommit(outputBuilder.build(), computationState);
+  }
+
+  private StageInfo getStageInfo(ComputationState computationState) {
+    MapTask mapTask = computationState.getMapTask();
+    return stageInfoMap.computeIfAbsent(
+        mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
+  }
+
+  private void commitWorkBatch(
+      ComputationState computationState,
+      List<Work> workBatch,
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders) {
+    Preconditions.checkState(
+        workBatch.size() == 1, "Expected single-key work batch, got: " + 
workBatch.size());
+    commitSingleKeyWork(computationState, workBatch.get(0), 
outputBuilders.get(0));
+  }
+
+  private void commitSingleKeyWork(
+      ComputationState computationState,
+      Work work,
+      Windmill.WorkItemCommitRequest.Builder commitRequestBuilder) {
+    // Validate the commit request, possibly requesting truncation if the 
commitSize is too large.
+    Windmill.WorkItemCommitRequest validatedCommitRequest =
+        validateCommitRequestSize(
+            commitRequestBuilder.build(), computationState.getComputationId(), 
work.getWorkItem());

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -388,86 +372,143 @@ private ExecuteWorkResult executeWork(
       SideInputStateFetcher localSideInputStateFetcher =
           
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
 
-      // If the read output KVs, then we can decode Windmill's byte key into 
userland
-      // key object and provide it to the execution context for use with 
per-key state.
-      // Otherwise, we pass null.
-      //
-      // The coder type that will be present is:
-      //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
-      Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
-      @SuppressWarnings("deprecation")
-      @Nullable
-      final Object executionKey =
-          !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(), 
Coder.Context.OUTER);
-
-      if (workItem.hasHotKeyInfo()) {
-        Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
-        Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 
1000);
-
-        String stepName = 
getShuffleTaskStepName(computationState.getMapTask());
-        if (executionKey != null
-            && (options.isHotKeyLoggingEnabled()
-                || hasExperiment(options, "enable_hot_key_logging"))
-            && keyCoder.isPresent()) {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
-        } else {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
-        }
-      }
+      StreamingModeExecutionContext.KeySwitchListener keySwitchListener =
+          createKeySwitchListener(computationState);
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
+          work, stateReader, localSideInputStateFetcher, workExecutor, handle, 
keySwitchListener);
 
-      if (work.isFailed()) {
-        throw new WorkItemCancelledException(workItem.getShardingKey());
+      StreamingModeExecutionContext context = 
computationWorkExecutor.context();
+      if (context.workIsFailed()) {
+        throw new 
WorkItemCancelledException(work.getWorkItem().getShardingKey());
       }
 
-      // Reports source bytes processed to WorkItemCommitRequest if available.
-      try {
-        long sourceBytesProcessed =
-            computationWorkExecutor.computeSourceBytesProcessed(
-                computationState.sourceBytesProcessCounterName());
-        outputBuilder.setSourceBytesProcessed(sourceBytesProcessed);
-      } catch (Exception e) {
-        LOG.error("{}", e.toString());
-      }
-
-      
commitFinalizer.cacheCommitFinalizers(computationWorkExecutor.context().flushState());
+      // Retrieve executed works, output builders, and accumulated callbacks 
from execution context
+      List<Work> workBatch = context.getExecutedWorks();
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders = 
context.getOutputBuilders();
+      Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks = 
context.getAccumulatedCallbacks();
 
+      context.clear();
       // Release the execution state for another thread to use.
       computationState.releaseComputationWorkExecutor(computationWorkExecutor);
       computationWorkExecutor = null;
 
-      work.setState(Work.State.COMMIT_QUEUED);
-      
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
-
       return ExecuteWorkResult.create(
-          outputBuilder, stateReader.getBytesRead() + 
localSideInputStateFetcher.getBytesRead());
+          workBatch,
+          outputBuilders,
+          accumulatedCallbacks,
+          context.getStateBytesRead() + 
localSideInputStateFetcher.getBytesRead());
     } catch (Throwable t) {
       if (computationWorkExecutor != null) {
         // If processing failed due to a thrown exception, close the 
executionState. Do not
         // return/release the executionState back to computationState as that 
will lead to this
         // executionState instance being reused.
-        LOG.debug("Invalidating executor after work item {} failed", 
workItem.getWorkToken(), t);
+        LOG.debug(
+            "Invalidating executor after work item {} failed",
+            work.getWorkItem().getWorkToken(),
+            t);
         computationWorkExecutor.invalidate();
       }
-
       // Re-throw the exception, it will be caught and handled by 
workFailureProcessor downstream.
       throw t;
     }
   }
 
+  private void handleOnlyFinalize(
+      ComputationState computationState, Work work, Windmill.WorkItem 
workItem) {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder =
+        initializeOutputBuilder(workItem.getKey(), workItem);
+    
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
+    work.setState(Work.State.COMMIT_QUEUED);
+    work.queueCommit(outputBuilder.build(), computationState);
+  }
+
+  private StageInfo getStageInfo(ComputationState computationState) {
+    MapTask mapTask = computationState.getMapTask();
+    return stageInfoMap.computeIfAbsent(
+        mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
+  }
+
+  private void commitWorkBatch(
+      ComputationState computationState,
+      List<Work> workBatch,
+      List<Windmill.WorkItemCommitRequest.Builder> outputBuilders) {
+    Preconditions.checkState(
+        workBatch.size() == 1, "Expected single-key work batch, got: " + 
workBatch.size());
+    commitSingleKeyWork(computationState, workBatch.get(0), 
outputBuilders.get(0));
+  }
+
+  private void commitSingleKeyWork(
+      ComputationState computationState,
+      Work work,
+      Windmill.WorkItemCommitRequest.Builder commitRequestBuilder) {
+    // Validate the commit request, possibly requesting truncation if the 
commitSize is too large.
+    Windmill.WorkItemCommitRequest validatedCommitRequest =
+        validateCommitRequestSize(
+            commitRequestBuilder.build(), computationState.getComputationId(), 
work.getWorkItem());
+    work.setState(Work.State.COMMIT_QUEUED);
+    validatedCommitRequest =
+        validatedCommitRequest
+            .toBuilder()
+            
.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler))
+            .build();
+    work.queueCommit(validatedCommitRequest, computationState);
+  }
+
+  private void recordProcessingTime(
+      StageInfo stageInfo,
+      @Nullable List<Work> worksToCleanup,
+      Work work,
+      long processingStartTimeNanos) {
+    // Update total processing time counters. Updating in finally clause 
ensures that
+    // work items causing exceptions are also accounted in time spent.
+    long processingTimeMsecs =
+        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
processingStartTimeNanos);
+    stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);
+    if (anyWorkHasTimers(worksToCleanup, work)) {
+      // Attribute all the processing to timers if the work item contains any 
timers.
+      // Tests show that work items rarely contain both timers and message 
bundles. It should
+      // be a fairly close approximation.
+      // Another option: Derive time split between messages and timers based 
on recent totals.
+      // either here or in DFE.
+      stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
+    }
+  }
+
+  private static boolean anyWorkHasTimers(@Nullable List<Work> works, Work 
primaryWork) {
+    if (works != null && !works.isEmpty()) {
+      return works.stream().anyMatch(w -> w.getWorkItem().hasTimers());
+    }
+    return primaryWork.getWorkItem().hasTimers();
+  }
+
+  private StreamingModeExecutionContext.KeySwitchListener 
createKeySwitchListener(
+      ComputationState computationState) {
+    return (oldWork, newWork) -> {
+      resetWorkLoggingContext();

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to