arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3431470769


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java:
##########
@@ -129,73 +126,86 @@ public static <K, T> WindowingWindmillReader<K, T> create(
     return new WindowingWindmillReader<>(coder, context, 
skipUndecodableElements);
   }
 
+  private KeyedWorkItem<K, T> createKeyedWorkItem() {
+    @SuppressWarnings("unchecked")
+    @Nullable
+    K key = (K) context.getKey();
+    return new WindmillKeyedWorkItem<>(
+        key,
+        context.getWorkItem(),
+        windowCoder,
+        windowsCoder,
+        valueCoder,
+        context.getWindmillTagEncoding(),
+        context.getDrainMode(),
+        skipUndecodableElements.isAccessible()
+            && Boolean.TRUE.equals(skipUndecodableElements.get()));
+  }
+
+  private boolean isEmpty(KeyedWorkItem<K, T> keyedWorkItem) {
+    return Iterables.isEmpty(keyedWorkItem.timersIterable())
+        && Iterables.isEmpty(keyedWorkItem.elementsIterable());
+  }
+
   @Override
   public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() 
throws IOException {
-    final K key =
-        keyCoder.decode(
-            checkStateNotNull(context.getSerializedKey()).newInput(), 
Coder.Context.OUTER);
-    final WorkItem workItem = context.getWorkItem();
-    KeyedWorkItem<K, T> keyedWorkItem =
-        new WindmillKeyedWorkItem<>(
-            key,
-            workItem,
-            windowCoder,
-            windowsCoder,
-            valueCoder,
-            context.getWindmillTagEncoding(),
-            context.getDrainMode(),
-            skipUndecodableElements.isAccessible()
-                && Boolean.TRUE.equals(skipUndecodableElements.get()));
-    final boolean isEmptyWorkItem =
-        (Iterables.isEmpty(keyedWorkItem.timersIterable())
-            && Iterables.isEmpty(keyedWorkItem.elementsIterable()));
-    final WindowedValue<KeyedWorkItem<K, T>> value = new 
ValueInEmptyWindows<>(keyedWorkItem);
-
-    // Return a noop iterator when current workitem is an empty workitem.
-    if (isEmptyWorkItem) {
-      return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
-        @Override
-        public boolean start() throws IOException {
-          context.finishKey();
-          return false;
+    final KeyedWorkItem<K, T> firstKeyedWorkItem = createKeyedWorkItem();
+    final boolean firstKeyIsEmpty = isEmpty(firstKeyedWorkItem);
+    final WindowedValue<KeyedWorkItem<K, T>> firstValue =
+        new ValueInEmptyWindows<>(firstKeyedWorkItem);
+
+    return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
+      private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
+      private boolean started = false;
+
+      @Override
+      public boolean start() throws IOException {
+        if (context.workIsFailed()) {
+          throw new WorkItemCancelledException(
+              checkStateNotNull(context.getWorkItem()).getShardingKey());
         }
-
-        @Override
-        public boolean advance() throws IOException {
+        if (started) {
           return false;
         }
-
-        @Override
-        public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
-          throw new NoSuchElementException();
+        started = true;
+        if (firstKeyIsEmpty) {
+          return advance(); // Try to transition immediately if the first key 
is empty!
         }
-      };
-    } else {
-      return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
-        private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
-
-        @Override
-        public boolean start() throws IOException {
-          current = value;
-          return true;
+        current = firstValue;

Review Comment:
   Yes, done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -384,89 +381,143 @@ private ExecuteWorkResult executeWork(
 
     try {
       WindmillStateReader stateReader = work.createWindmillStateReader();
-      SideInputStateFetcher localSideInputStateFetcher =
-          
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
-
-      // If the read output KVs, then we can decode Windmill's byte key into 
userland
-      // key object and provide it to the execution context for use with 
per-key state.
-      // Otherwise, we pass null.
-      //
-      // The coder type that will be present is:
-      //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
-      Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
-      @SuppressWarnings("deprecation")
-      @Nullable
-      final Object executionKey =
-          !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(), 
Coder.Context.OUTER);
-
-      if (workItem.hasHotKeyInfo()) {
-        Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
-        Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 
1000);
-
-        String stepName = 
getShuffleTaskStepName(computationState.getMapTask());
-        if (executionKey != null
-            && (options.isHotKeyLoggingEnabled()
-                || hasExperiment(options, "enable_hot_key_logging"))
-            && keyCoder.isPresent()) {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
-        } else {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
-        }
-      }
+
+      KeyTransitionListener keyTransitionListener = 
createKeyTransitionListener();
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
+          work, stateReader, workExecutor, handle, keyTransitionListener);
+
+      List<Work> workBatch;
+      List<Windmill.WorkItemCommitRequest> workItemCommits;
+      Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks;
+      long stateBytesRead;
+      {
+        StreamingModeExecutionContext context = 
computationWorkExecutor.context();

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to