scwhittle commented on code in PR #36576:
URL: https://github.com/apache/beam/pull/36576#discussion_r2468774040


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -92,10 +92,23 @@ public static ConsumerAndMetadata forConsumer(
     public abstract ExecutionStateTracker getExecutionStateTracker();
   }
 
+  @AutoValue
+  abstract static class ExecutionStateKey {
+    public static ExecutionStateKey of(String pTransformId, String 
pTransformUniqueName) {
+      return new AutoValue_PCollectionConsumerRegistry_ExecutionStateKey(
+          pTransformId, pTransformUniqueName);
+    }
+
+    public abstract String getPTransformId();
+
+    public abstract String getPTransformUniqueId();
+  }
+
   private final ExecutionStateTracker stateTracker;
   private final ShortIdMap shortIdMap;
-  private final Map<String, List<ConsumerAndMetadata>> 
pCollectionIdsToConsumers;
-  private final Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer;
+  private final Map<String, List<ConsumerAndMetadata>> 
pCollectionIdsToConsumers = new HashMap<>();

Review Comment:
   I will put together a separate PR, looks like it would cleanup all the 
call-sites



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -445,8 +445,20 @@ public <T> void addIncomingTimerEndpoint(
                     String timerFamilyId,
                     org.apache.beam.sdk.coders.Coder<Timer<T>> coder,
                     FnDataReceiver<Timer<T>> receiver) {
+                  ExecutionStateSampler.ExecutionState executionState =
+                      pCollectionConsumerRegistry.getProcessingExecutionState(
+                          pTransformId, pTransform.getUniqueName());
+                  FnDataReceiver<Timer<T>> wrappedReceiver =
+                      (Timer<T> timer) -> {
+                        executionState.activate();

Review Comment:
   I will put together a separate PR, looks like it would cleanup all the 
call-sites



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to