mynameborat commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959805133


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed 
automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, 
keyCoder);
-
-        timerRegistry.schedule(
-            keyedTimerData, 
keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
+
+      while (iter.hasNext() && processTimeBuffer.size() < 
maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
       }
-      processingTimeTimerState.closeIterators();
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", 
processTimeBuffer.size());
     }
 
-    /**
-     * Restore timer state from RocksDB. This is needed for migration of 
existing jobs. Give events
-     * in eventTimeTimerState, construct timestampSortedEventTimeTimerState 
preparing for memory
-     * reloading. TO-DO: processing time timers are still loaded into memory 
in one shot; will apply
-     * the same optimization mechanism as event time timer
-     */
+    /** Restore timer state from RocksDB. */
     private void init() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
-          eventTimeTimerState.readIterator().read();
-      // use hasNext to check empty, because this is relatively cheap compared 
with Iterators.size()
-      if (eventTimersIter.hasNext()) {
-        final Iterator sortedEventTimerIter =
-            timestampSortedEventTimeTimerState.readIterator().read();
-
-        if (!sortedEventTimerIter.hasNext()) {
-          // inline the migration code
-          while (eventTimersIter.hasNext()) {
-            final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
-            final KeyedTimerData<K> keyedTimerData =
-                TimerKey.toKeyedTimerData(
-                    entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, 
keyCoder);
-            timestampSortedEventTimeTimerState.add(keyedTimerData);
-          }
-        }
-        timestampSortedEventTimeTimerState.closeIterators();
-      }
-      eventTimeTimerState.closeIterators();
+      migrateToKeyedTimerState(
+          eventTimeTimerState, timestampSortedEventTimeTimerState, 
TimeDomain.EVENT_TIME);
+      migrateToKeyedTimerState(
+          processingTimeTimerState,
+          timestampSortedProcessTimeTimerState,
+          TimeDomain.PROCESSING_TIME);
 
       reloadEventTimeTimers();
-      loadProcessingTimeTimers();
+      reloadProcessingTimeTimers();
+    }
+  }
+
+  /**
+   * This is needed for migration of existing jobs. Give events in timerState, 
construct
+   * keyedTimerState preparing for memory reloading.
+   */
+  private void migrateToKeyedTimerState(
+      SamzaMapState<TimerKey<K>, Long> timerState,
+      SamzaSetState<KeyedTimerData<K>> keyedTimerState,
+      TimeDomain timeDomain) {
+    final Iterator<Map.Entry<TimerKey<K>, Long>> timersIter = 
timerState.readIterator().read();
+    // use hasNext to check empty, because this is relatively cheap compared 
with Iterators.size()
+    if (timersIter.hasNext()) {
+      final Iterator keyedTimerIter = keyedTimerState.readIterator().read();
+
+      if (!keyedTimerIter.hasNext()) {
+        // Migrate from timerState to keyedTimerState
+        while (timersIter.hasNext()) {
+          final Map.Entry<TimerKey<K>, Long> entry = timersIter.next();
+          final KeyedTimerData<K> keyedTimerData =
+              TimerKey.toKeyedTimerData(entry.getKey(), entry.getValue(), 
timeDomain, keyCoder);
+          keyedTimerState.add(keyedTimerData);
+        }
+      }
+      keyedTimerState.closeIterators();

Review Comment:
   Will this code go away after migration? If so, can we add a TODO and attach 
a JIRA ticket? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed 
automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, 
keyCoder);
-
-        timerRegistry.schedule(
-            keyedTimerData, 
keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
+
+      while (iter.hasNext() && processTimeBuffer.size() < 
maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
       }
-      processingTimeTimerState.closeIterators();
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", 
processTimeBuffer.size());

Review Comment:
   Need to close the iterator since the loop can exit early



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -218,6 +235,10 @@ public Instant getOutputWatermark() {
   // for unit test only
   NavigableSet<KeyedTimerData<K>> getEventTimeBuffer() {
     return eventTimeBuffer;
+  } // todo dchen1

Review Comment:
   remove `//todo dchen1`



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -444,6 +470,16 @@ private class SamzaTimerState {
                           new TimerKeyCoder<>(keyCoder, windowCoder),
                           VarLongCoder.of()));
 
+      this.timestampSortedProcessTimeTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-pts",

Review Comment:
   looks like we anyways don't have consistent naming scheme and can't change 
`-ts` to `-ets`. maybe be more descriptive instead of `pts` to help readability 
and debuggability.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -307,7 +327,11 @@ public void setTimer(TimerData timerData) {
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // Append to buffer iff not full
+          if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+            processTimeBuffer.add(keyedTimerData);
+          }

Review Comment:
   Do we need to do anything in case we have reached the buffer size? just like 
the event time scenario.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,18 +302,15 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries

Review Comment:
   Why is this deleted? Does it no longer apply for event time is it? 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -191,11 +193,26 @@ public Collection<KeyedTimerData<K>> removeReadyTimers() {
         state.reloadEventTimeTimers();
       }
     }
+
+    // Flush all timers for process time
+    final Iterator<KeyedTimerData<K>> processBufferIterator = 
processTimeBuffer.iterator();
+    while (processBufferIterator.hasNext() && readyTimers.size() < 
maxReadyTimersToProcessOnce) {
+      KeyedTimerData<K> processTimerData = processBufferIterator.next();
+      readyTimers.add(processTimerData);
+      processBufferIterator.remove();
+      state.deletePersisted(processTimerData);

Review Comment:
   Given we persist the deletes for the process timers, are the ready timers 
persisted themselves in case of failures? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to