xinyuiscool commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r982688831


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {

Review Comment:
   eventTimeBuffer.last() will throw exception if the set is empty, right?
   
   The updated code is far from readable compared to the previous condition. I 
have no idea what's going on in such a complex nested if statement.  @dxichen : 
please think about how others can understand this piece of code and maintain it 
in the long turn. The changes need to be very readable and easy to debug. 
Otherwise it'll increase maintenance overhead.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;

Review Comment:
   Hmm, I think we persist the timers in state regardless we put it in the 
buffer. Why we need this flag here? can we just learn this by checking whether 
the state is empty? The more flags in the code, the messier logic it will be.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be 
triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the 
scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && 
!processTimersInState)

Review Comment:
   This condition is not readable. Rewrite.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -539,63 +599,76 @@ private void reloadEventTimeTimers() {
         LOG.debug(
             "Event time timers in State is empty, filled {} timers out of {} 
buffer capacity",
             eventTimeBuffer.size(),
-            maxEventTimeInBuffer);
+            maxEventTimerBufferSize);
         // Reset the flag variable to indicate there are no more 
KeyedTimerData in State
-        maxEventTimeInBuffer = Long.MAX_VALUE;
+        eventsTimersInState = false;

Review Comment:
   Seems there is some misunderstanding in the logic here. We are not removing 
timers from the state, so they are still there.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;

Review Comment:
   The timers are always in state. The flag name is just wrong.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -539,63 +599,76 @@ private void reloadEventTimeTimers() {
         LOG.debug(
             "Event time timers in State is empty, filled {} timers out of {} 
buffer capacity",
             eventTimeBuffer.size(),
-            maxEventTimeInBuffer);
+            maxEventTimerBufferSize);
         // Reset the flag variable to indicate there are no more 
KeyedTimerData in State
-        maxEventTimeInBuffer = Long.MAX_VALUE;
+        eventsTimersInState = false;
       }
     }
 
-    private void loadProcessingTimeTimers() {
-      final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
-          processingTimeTimerState.readIterator().read();
-      // since the iterator will reach to the end, it will be closed 
automatically
-      int count = 0;
-      while (iter.hasNext()) {
-        final Map.Entry<TimerKey<K>, Long> entry = iter.next();
-        final KeyedTimerData keyedTimerData =
-            TimerKey.toKeyedTimerData(
-                entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME, 
keyCoder);
+    private void reloadProcessingTimeTimers() {
+      final Iterator<KeyedTimerData<K>> iter =
+          timestampSortedProcessTimeTimerState.readIterator().read();
 
+      while (iter.hasNext() && processTimeBuffer.size() < 
maxProcessTimerBufferSize) {
+        final KeyedTimerData keyedTimerData = iter.next();
+        processTimeBuffer.add(keyedTimerData);
         timerRegistry.schedule(
             keyedTimerData, 
keyedTimerData.getTimerData().getTimestamp().getMillis());
-        ++count;
       }
-      processingTimeTimerState.closeIterators();
+      timestampSortedProcessTimeTimerState.closeIterators();
+      LOG.info("Loaded {} processing time timers in memory", 
processTimeBuffer.size());
 
-      LOG.info("Loaded {} processing time timers in memory", count);
+      if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+        LOG.debug(
+            "Process time timers in State is empty, filled {} timers out of {} 
buffer capacity",
+            processTimeBuffer.size(),
+            maxProcessTimerBufferSize);
+        // Reset the flag variable to indicate there are no more 
KeyedTimerData in State
+        processTimersInState = false;

Review Comment:
   same comment above. I don't like adding these random flags either.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be 
triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the 
scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && 
!processTimersInState)
+              || newTimestamp
+                  < 
processTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
+            processTimeBuffer.add(keyedTimerData);
+            timerRegistry.schedule(keyedTimerData, newTimestamp);
+            if (processTimeBuffer.size() > maxProcessTimerBufferSize) {
+              KeyedTimerData oldKeyedTimerData = processTimeBuffer.pollLast();
+              timerRegistry.delete(oldKeyedTimerData);
+            }
+          } else {
+            processTimersInState = true;

Review Comment:
   I think this is useless.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be 
triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the 
scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && 
!processTimersInState)
+              || newTimestamp
+                  < 
processTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
+            processTimeBuffer.add(keyedTimerData);
+            timerRegistry.schedule(keyedTimerData, newTimestamp);
+            if (processTimeBuffer.size() > maxProcessTimerBufferSize) {
+              KeyedTimerData oldKeyedTimerData = processTimeBuffer.pollLast();
+              timerRegistry.delete(oldKeyedTimerData);
+            }
+          } else {
+            processTimersInState = true;

Review Comment:
   These flags are just recipe for disaster. Plus we are preserving the timers 
in state no matter what.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,35 +298,49 @@ public void setTimer(TimerData timerData) {
       // persist it first
       state.persist(keyedTimerData);
 
-      // TO-DO: apply the same memory optimization over processing timers
+      // add timer data to buffer where applicable
       switch (timerData.getDomain()) {
         case EVENT_TIME:
           /*
            * To determine if the upcoming KeyedTimerData could be added to the 
Buffer while
            * guaranteeing the Buffer's timestamps are all <= than those in 
State Store to preserve
            * timestamp eviction priority:
            *
-           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates 
that the State is empty,
-           * therefore all the Event times greater or lesser than newTimestamp 
are in the buffer;
-           *
-           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that 
there are entries
-           * greater than newTimestamp, so it is safe to add it to the buffer
+           * <p>If eventTimeBuffer size < maxEventTimerBufferSize and there 
are no more potential earlier timers
+           * in state, or newTimestamp < maxEventTimeInBuffer then it 
indicates that there are entries greater than
+           * newTimestamp, so it is safe to add it to the buffer
            *
            * <p>In case that the Buffer is full, we remove the largest timer 
from memory according
            * to {@link KeyedTimerData.compareTo()}
            */
-          if (newTimestamp < maxEventTimeInBuffer) {
+          if ((maxEventTimerBufferSize > eventTimeBuffer.size() && 
!eventsTimersInState)
+              || newTimestamp < 
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis()) {
             eventTimeBuffer.add(keyedTimerData);
             if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
               eventTimeBuffer.pollLast();
-              maxEventTimeInBuffer =
-                  
eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
+          } else {
+            eventsTimersInState = true;
           }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // The timer is added to the buffer if the new timestamp will be 
triggered earlier than
+          // the oldest time in the current buffer.
+          // Any timers removed from the buffer will also be deleted from the 
scheduler.
+          if ((maxProcessTimerBufferSize > processTimeBuffer.size() && 
!processTimersInState)

Review Comment:
   Again, a large chunk of nested if-else which is probably only understandable 
to you. Rewrite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to