scwhittle commented on code in PR #36283:
URL: https://github.com/apache/beam/pull/36283#discussion_r2378263142


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -765,13 +766,15 @@ public void start(
         Instant processingTime,
         WindmillStateCache.ForKey cacheForKey,
         Watermarks watermarks) {
+      WindmillStateTagUtil windmillStateTagUtil = new WindmillStateTagUtil();

Review Comment:
   Can we have a singleton instance for now since it is stateless instead of 
creating a new object for every bundle?
   
   And just checking, you're planning on changing it to be stateful? If not 
perhaps we could avoid all the plumbing and just reference a singleton 
throughout.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -65,6 +70,42 @@ static ByteString encodeKey(StateNamespace namespace, 
StateTag<?> address) {
     }
   }
 
+  /**
+   * Produce a state tag that is guaranteed to be unique for the given timer, 
to add a watermark
+   * hold that is only freed after the timer fires.
+   */
+  public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+    String tagString;
+    if ("".equals(timerData.getTimerFamilyId())) {
+      tagString =
+          prefix.byteString().toStringUtf8()

Review Comment:
   can we cache the utf8 string in WindmillNamespacePrefix?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -21,18 +21,23 @@
 import java.lang.ref.SoftReference;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-class WindmillStateUtil {
+@Internal
+public class WindmillStateTagUtil {

Review Comment:
   comment on thread-safety



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -65,6 +70,42 @@ static ByteString encodeKey(StateNamespace namespace, 
StateTag<?> address) {
     }
   }
 
+  /**
+   * Produce a state tag that is guaranteed to be unique for the given timer, 
to add a watermark
+   * hold that is only freed after the timer fires.
+   */
+  public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+    String tagString;
+    if ("".equals(timerData.getTimerFamilyId())) {

Review Comment:
   seems more straightforward to use empty()



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -65,6 +70,42 @@ static ByteString encodeKey(StateNamespace namespace, 
StateTag<?> address) {
     }
   }
 
+  /**
+   * Produce a state tag that is guaranteed to be unique for the given timer, 
to add a watermark
+   * hold that is only freed after the timer fires.
+   */
+  public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+    String tagString;

Review Comment:
   Use StringBuilder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to