scwhittle commented on code in PR #37604:
URL: https://github.com/apache/beam/pull/37604#discussion_r2821454496


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java:
##########
@@ -58,22 +59,19 @@ public abstract class WindmillTagEncoding {
    * @param timerTag tag of the timer that maps to the hold.
    */
   public abstract ByteString timerHoldTag(
-      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
timerTag);
+      WindmillTimerType windmillTimerType, TimerData timerData, ByteString 
timerTag);
 
   /**
    * Produce a tag that is guaranteed to be unique for the given prefix, 
namespace, domain and

Review Comment:
   update comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -225,15 +225,16 @@ public TimerData windmillTimerToTimerData(
     }
 
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, 
windowCoder);
-    return TimerData.of(
-        timerId,
-        timerFamily,
-        namespace,
-        timestamp,
-        outputTimestamp,
-        timerTypeToTimeDomain(timer.getType()));
+    return Pair.of(
+        timerType,
+        TimerData.of(

Review Comment:
   how about putting the timertype in TimerData? The timedomain could be a 
function instead to keep the size same but then it is one less allocation than 
this Pair



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -159,17 +155,21 @@ public TimerData windmillTimerToTimerData(
     //    - the Global StateNamespace is different, and becomes "/"
     //  - the id is totally arbitrary; currently unescaped though that could 
change
 
-    ByteString tag = timer.getTag();
-    checkArgument(
-        tag.startsWith(prefix.byteString()),
-        "Expected timer tag %s to start with prefix %s",
-        tag,
-        prefix.byteString());
+    ByteString tag = 
ByteString.copyFrom(timer.getTag().asReadOnlyByteBuffer());

Review Comment:
   if we have teh namespaceprefixstring method could we avoid this copy to 
bytestring?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -832,17 +832,13 @@ public <W extends BoundedWindow> TimerData 
getNextFiredTimer(Coder<W> windowCode
       if (cachedFiredSystemTimers == null) {
         cachedFiredSystemTimers =
             
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isSystemTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
+                .filter(timer -> timer.getStateFamily().equals(stateFamily))
                 .transform(
                     timer ->
                         windmillTagEncoding.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
-                            timer,
-                            windowCoder,
-                            getDrainMode()))
+                            timer, windowCoder, getDrainMode()))
+                .filter(pair -> pair.getLeft() == 
WindmillTimerType.SYSTEM_TIMER)

Review Comment:
   can this remain above the transform since it's cheaper?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java:
##########
@@ -20,27 +20,24 @@
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 
-/**
- * A prefix for a Windmill state or timer tag to separate user state and 
timers from system state
- * and timers.
- */
+/** A type for a Windmill timer to separate user state and timers from system 
state and timers. */
 @Internal
-public enum WindmillNamespacePrefix {
-  USER_NAMESPACE_PREFIX {
+public enum WindmillTimerType {
+  USER_TIMER {
     @Override
-    public ByteString byteString() {
+    public ByteString namespacePrefix() {
       return USER_NAMESPACE_BYTESTRING;
     }
   },
 
-  SYSTEM_NAMESPACE_PREFIX {
+  SYSTEM_TIMER {
     @Override
-    public ByteString byteString() {
+    public ByteString namespacePrefix() {
       return SYSTEM_NAMESPACE_BYTESTRING;
     }
   };
 
-  public abstract ByteString byteString();
+  public abstract ByteString namespacePrefix();

Review Comment:
   maybe this should not be a method on the enum but static function within the 
v1 tag class?  
   That enforces that elsewhere we are always using the encoding agnostic type



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -895,17 +891,13 @@ public <W extends BoundedWindow> TimerData 
getNextFiredUserTimer(Coder<W> window
         cachedFiredUserTimers =
             Iterators.peekingIterator(
                 
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
-                    .filter(
-                        timer ->
-                            WindmillTimerInternals.isUserTimer(timer)
-                                && timer.getStateFamily().equals(stateFamily))
+                    .filter(timer -> 
timer.getStateFamily().equals(stateFamily))
                     .transform(
                         timer ->
                             windmillTagEncoding.windmillTimerToTimerData(
-                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
-                                timer,
-                                windowCoder,
-                                getDrainMode()))
+                                timer, windowCoder, getDrainMode()))
+                    .filter(pair -> pair.getLeft() == 
WindmillTimerType.USER_TIMER)

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -60,28 +60,28 @@ class WindmillTimerInternals implements TimerInternals {
   private final Watermarks watermarks;
   private final Instant processingTime;
   private final String stateFamily;
-  private final WindmillNamespacePrefix prefix;
+  private final WindmillTimerType type;
   private final Consumer<TimerData> onTimerModified;
   private final WindmillTagEncoding windmillTagEncoding;
 
   public WindmillTimerInternals(
       String stateFamily, // unique identifies a step
-      WindmillNamespacePrefix prefix, // partitions user and system namespaces 
into "/u" and "/s"
+      WindmillTimerType type, // partitions user and system namespaces into 
"/u" and "/s"

Review Comment:
   remove the /u and /s specific comment?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -71,11 +70,11 @@ public InternedByteString stateTag(StateNamespace 
namespace, StateTag<?> address
   /** {@inheritDoc} */
   @Override
   public ByteString timerHoldTag(
-      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
unusedTimerTag) {
+      WindmillTimerType windmillTimerType, TimerData timerData, ByteString 
unusedTimerTag) {
     String tagString;
     if ("".equals(timerData.getTimerFamilyId())) {
       tagString =
-          prefix.byteString().toStringUtf8()
+          windmillTimerType.namespacePrefix().toStringUtf8()

Review Comment:
   could have namespacePrefixString method on the timertype to avoid doing this 
same conversion many times



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java:
##########
@@ -415,7 +416,7 @@ public static Collection<Object[]> data() {
     public StateNamespace namespace;
 
     @Parameter(1)
-    public WindmillNamespacePrefix prefix;
+    public WindmillTimerType prefix;

Review Comment:
   rename



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to