This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fbf95  [BEAM-12333] Changing TimerKey to include TimerFamilyId 
(#14802)
67fbf95 is described below

commit 67fbf9581cd2eb0850e0cdda4463bf0fdbe8bd2d
Author: ajo thomas <[email protected]>
AuthorDate: Mon May 17 16:00:22 2021 -0700

    [BEAM-12333] Changing TimerKey to include TimerFamilyId (#14802)
---
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 125 +++++++++------------
 .../runtime/SamzaTimerInternalsFactoryTest.java    |  20 +++-
 2 files changed, 67 insertions(+), 78 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index ad426f2..2588223 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
+import com.google.auto.value.AutoValue;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -445,7 +446,7 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
           final Long timestamp = eventTimeTimerState.get(timerKey).read();
 
           if (timestamp != null) {
-            final KeyedTimerData keyedTimerDataInStore =
+            final KeyedTimerData<K> keyedTimerDataInStore =
                 TimerKey.toKeyedTimerData(timerKey, timestamp, 
TimeDomain.EVENT_TIME, keyCoder);
             timestampSortedEventTimeTimerState.remove(keyedTimerDataInStore);
           }
@@ -553,7 +554,7 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
           // inline the migration code
           while (eventTimersIter.hasNext()) {
             final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
-            final KeyedTimerData keyedTimerData =
+            final KeyedTimerData<K> keyedTimerData =
                 TimerKey.toKeyedTimerData(
                     entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME, 
keyCoder);
             timestampSortedEventTimeTimerState.add(keyedTimerData);
@@ -569,99 +570,66 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
     }
   }
 
-  private static class TimerKey<K> {
-    private final K key;
-    private final StateNamespace stateNamespace;
-    private final String timerId;
+  @AutoValue
+  abstract static class TimerKey<K> {
+    abstract K getKey();
+
+    abstract StateNamespace getStateNamespace();
+
+    abstract String getTimerId();
+
+    abstract String getTimerFamilyId();
+
+    static <K> Builder<K> builder() {
+      return new AutoValue_SamzaTimerInternalsFactory_TimerKey.Builder<>();
+    }
 
     static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
       final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
-      return new TimerKey<>(
-          keyedTimerData.getKey(), timerData.getNamespace(), 
timerData.getTimerId());
+      return TimerKey.<K>builder()
+          .setKey(keyedTimerData.getKey())
+          .setStateNamespace(timerData.getNamespace())
+          .setTimerId(timerData.getTimerId())
+          .setTimerFamilyId(timerData.getTimerFamilyId())
+          .build();
     }
 
     static <K> KeyedTimerData<K> toKeyedTimerData(
         TimerKey<K> timerKey, long timestamp, TimeDomain domain, Coder<K> 
keyCoder) {
       byte[] keyBytes = null;
-      if (keyCoder != null && timerKey.key != null) {
+      if (keyCoder != null && timerKey.getKey() != null) {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try {
-          keyCoder.encode(timerKey.key, baos);
+          keyCoder.encode(timerKey.getKey(), baos);
         } catch (IOException e) {
-          throw new RuntimeException("Could not encode key: " + timerKey.key, 
e);
+          throw new RuntimeException("Could not encode key: " + 
timerKey.getKey(), e);
         }
         keyBytes = baos.toByteArray();
       }
 
-      return new KeyedTimerData<K>(
+      return new KeyedTimerData<>(
           keyBytes,
-          timerKey.key,
+          timerKey.getKey(),
           TimerInternals.TimerData.of(
-              timerKey.timerId,
-              timerKey.stateNamespace,
+              timerKey.getTimerId(),
+              timerKey.getTimerFamilyId(),
+              timerKey.getStateNamespace(),
               new Instant(timestamp),
               new Instant(timestamp),
               domain));
     }
 
-    private TimerKey(K key, StateNamespace stateNamespace, String timerId) {
-      this.key = key;
-      this.stateNamespace = stateNamespace;
-      this.timerId = timerId;
-    }
+    @AutoValue.Builder
+    abstract static class Builder<K> {
+      abstract Builder<K> setKey(K key);
 
-    public K getKey() {
-      return key;
-    }
+      abstract Builder<K> setStateNamespace(StateNamespace stateNamespace);
 
-    public StateNamespace getStateNamespace() {
-      return stateNamespace;
-    }
+      abstract Builder<K> setTimerId(String timerId);
 
-    public String getTimerId() {
-      return timerId;
-    }
+      abstract Builder<K> setTimerFamilyId(String timerFamilyId);
 
-    @Override
-    public boolean equals(@Nullable Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      TimerKey<?> timerKey = (TimerKey<?>) o;
-
-      if (key != null ? !key.equals(timerKey.key) : timerKey.key != null) {
-        return false;
-      }
-      if (!stateNamespace.equals(timerKey.stateNamespace)) {
-        return false;
-      }
-
-      return timerId.equals(timerKey.timerId);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = key != null ? key.hashCode() : 0;
-      result = 31 * result + stateNamespace.hashCode();
-      result = 31 * result + timerId.hashCode();
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "TimerKey{"
-          + "key="
-          + key
-          + ", stateNamespace="
-          + stateNamespace
-          + ", timerId='"
-          + timerId
-          + '\''
-          + '}';
+      abstract TimerKey<K> build();
     }
   }
 
@@ -682,12 +650,14 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
         throws CoderException, IOException {
 
       // encode the timestamp first
-      STRING_CODER.encode(value.timerId, outStream);
-      STRING_CODER.encode(value.stateNamespace.stringKey(), outStream);
+      STRING_CODER.encode(value.getTimerId(), outStream);
+      STRING_CODER.encode(value.getStateNamespace().stringKey(), outStream);
 
       if (keyCoder != null) {
-        keyCoder.encode(value.key, outStream);
+        keyCoder.encode(value.getKey(), outStream);
       }
+
+      STRING_CODER.encode(value.getTimerFamilyId(), outStream);
     }
 
     @Override
@@ -703,7 +673,16 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
         key = keyCoder.decode(inStream);
       }
 
-      return new TimerKey<>(key, namespace, timerId);
+      // check if the stream has more available bytes. This is to ensure 
backward compatibility with
+      // old rocksdb state which does not encode timer family data
+      final String timerFamilyId = inStream.available() > 0 ? 
STRING_CODER.decode(inStream) : "";
+
+      return TimerKey.<K>builder()
+          .setTimerId(timerId)
+          .setStateNamespace(namespace)
+          .setKey(key)
+          .setTimerFamilyId(timerFamilyId)
+          .build();
     }
 
     @Override
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index eaeb6b3..3880dd9 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -70,6 +70,7 @@ import org.rocksdb.WriteOptions;
  */
 @SuppressWarnings({
   "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class SamzaTimerInternalsFactoryTest {
   @Rule public transient TemporaryFolder temporaryFolder = new 
TemporaryFolder();
@@ -302,7 +303,16 @@ public class SamzaTimerInternalsFactoryTest {
             "timer2", nameSpace, new Instant(100), new Instant(100), 
TimeDomain.PROCESSING_TIME);
     timerInternals.setTimer(timer2);
 
-    assertEquals(2, timerRegistry.timers.size());
+    final TimerInternals.TimerData timer3 =
+        TimerInternals.TimerData.of(
+            "timer3",
+            "timerFamilyId3",
+            nameSpace,
+            new Instant(100),
+            new Instant(100),
+            TimeDomain.PROCESSING_TIME);
+    timerInternals.setTimer(timer3);
+    assertEquals(3, timerRegistry.timers.size());
 
     store.close();
 
@@ -312,14 +322,14 @@ public class SamzaTimerInternalsFactoryTest {
     final SamzaTimerInternalsFactory<String> restoredFactory =
         createTimerInternalsFactory(restoredRegistry, "timer", 
pipelineOptions, store);
 
-    assertEquals(2, restoredRegistry.timers.size());
+    assertEquals(3, restoredRegistry.timers.size());
 
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     StringUtf8Coder.of().encode("testKey", baos);
     final byte[] keyBytes = baos.toByteArray();
-    restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, 
"testKey", timer1));
-    restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, 
"testKey", timer2));
-
+    restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes, 
"testKey", timer1));
+    restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes, 
"testKey", timer2));
+    restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes, 
"testKey", timer3));
     store.close();
   }
 

Reply via email to