This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 67fbf95 [BEAM-12333] Changing TimerKey to include TimerFamilyId
(#14802)
67fbf95 is described below
commit 67fbf9581cd2eb0850e0cdda4463bf0fdbe8bd2d
Author: ajo thomas <[email protected]>
AuthorDate: Mon May 17 16:00:22 2021 -0700
[BEAM-12333] Changing TimerKey to include TimerFamilyId (#14802)
---
.../samza/runtime/SamzaTimerInternalsFactory.java | 125 +++++++++------------
.../runtime/SamzaTimerInternalsFactoryTest.java | 20 +++-
2 files changed, 67 insertions(+), 78 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index ad426f2..2588223 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.samza.runtime;
+import com.google.auto.value.AutoValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -445,7 +446,7 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
final Long timestamp = eventTimeTimerState.get(timerKey).read();
if (timestamp != null) {
- final KeyedTimerData keyedTimerDataInStore =
+ final KeyedTimerData<K> keyedTimerDataInStore =
TimerKey.toKeyedTimerData(timerKey, timestamp,
TimeDomain.EVENT_TIME, keyCoder);
timestampSortedEventTimeTimerState.remove(keyedTimerDataInStore);
}
@@ -553,7 +554,7 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
// inline the migration code
while (eventTimersIter.hasNext()) {
final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
- final KeyedTimerData keyedTimerData =
+ final KeyedTimerData<K> keyedTimerData =
TimerKey.toKeyedTimerData(
entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME,
keyCoder);
timestampSortedEventTimeTimerState.add(keyedTimerData);
@@ -569,99 +570,66 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
}
}
- private static class TimerKey<K> {
- private final K key;
- private final StateNamespace stateNamespace;
- private final String timerId;
+ @AutoValue
+ abstract static class TimerKey<K> {
+ abstract K getKey();
+
+ abstract StateNamespace getStateNamespace();
+
+ abstract String getTimerId();
+
+ abstract String getTimerFamilyId();
+
+ static <K> Builder<K> builder() {
+ return new AutoValue_SamzaTimerInternalsFactory_TimerKey.Builder<>();
+ }
static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
- return new TimerKey<>(
- keyedTimerData.getKey(), timerData.getNamespace(),
timerData.getTimerId());
+ return TimerKey.<K>builder()
+ .setKey(keyedTimerData.getKey())
+ .setStateNamespace(timerData.getNamespace())
+ .setTimerId(timerData.getTimerId())
+ .setTimerFamilyId(timerData.getTimerFamilyId())
+ .build();
}
static <K> KeyedTimerData<K> toKeyedTimerData(
TimerKey<K> timerKey, long timestamp, TimeDomain domain, Coder<K>
keyCoder) {
byte[] keyBytes = null;
- if (keyCoder != null && timerKey.key != null) {
+ if (keyCoder != null && timerKey.getKey() != null) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- keyCoder.encode(timerKey.key, baos);
+ keyCoder.encode(timerKey.getKey(), baos);
} catch (IOException e) {
- throw new RuntimeException("Could not encode key: " + timerKey.key,
e);
+ throw new RuntimeException("Could not encode key: " +
timerKey.getKey(), e);
}
keyBytes = baos.toByteArray();
}
- return new KeyedTimerData<K>(
+ return new KeyedTimerData<>(
keyBytes,
- timerKey.key,
+ timerKey.getKey(),
TimerInternals.TimerData.of(
- timerKey.timerId,
- timerKey.stateNamespace,
+ timerKey.getTimerId(),
+ timerKey.getTimerFamilyId(),
+ timerKey.getStateNamespace(),
new Instant(timestamp),
new Instant(timestamp),
domain));
}
- private TimerKey(K key, StateNamespace stateNamespace, String timerId) {
- this.key = key;
- this.stateNamespace = stateNamespace;
- this.timerId = timerId;
- }
+ @AutoValue.Builder
+ abstract static class Builder<K> {
+ abstract Builder<K> setKey(K key);
- public K getKey() {
- return key;
- }
+ abstract Builder<K> setStateNamespace(StateNamespace stateNamespace);
- public StateNamespace getStateNamespace() {
- return stateNamespace;
- }
+ abstract Builder<K> setTimerId(String timerId);
- public String getTimerId() {
- return timerId;
- }
+ abstract Builder<K> setTimerFamilyId(String timerFamilyId);
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TimerKey<?> timerKey = (TimerKey<?>) o;
-
- if (key != null ? !key.equals(timerKey.key) : timerKey.key != null) {
- return false;
- }
- if (!stateNamespace.equals(timerKey.stateNamespace)) {
- return false;
- }
-
- return timerId.equals(timerKey.timerId);
- }
-
- @Override
- public int hashCode() {
- int result = key != null ? key.hashCode() : 0;
- result = 31 * result + stateNamespace.hashCode();
- result = 31 * result + timerId.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "TimerKey{"
- + "key="
- + key
- + ", stateNamespace="
- + stateNamespace
- + ", timerId='"
- + timerId
- + '\''
- + '}';
+ abstract TimerKey<K> build();
}
}
@@ -682,12 +650,14 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
throws CoderException, IOException {
// encode the timestamp first
- STRING_CODER.encode(value.timerId, outStream);
- STRING_CODER.encode(value.stateNamespace.stringKey(), outStream);
+ STRING_CODER.encode(value.getTimerId(), outStream);
+ STRING_CODER.encode(value.getStateNamespace().stringKey(), outStream);
if (keyCoder != null) {
- keyCoder.encode(value.key, outStream);
+ keyCoder.encode(value.getKey(), outStream);
}
+
+ STRING_CODER.encode(value.getTimerFamilyId(), outStream);
}
@Override
@@ -703,7 +673,16 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
key = keyCoder.decode(inStream);
}
- return new TimerKey<>(key, namespace, timerId);
+ // check if the stream has more available bytes. This is to ensure
backward compatibility with
+ // old rocksdb state which does not encode timer family data
+ final String timerFamilyId = inStream.available() > 0 ?
STRING_CODER.decode(inStream) : "";
+
+ return TimerKey.<K>builder()
+ .setTimerId(timerId)
+ .setStateNamespace(namespace)
+ .setKey(key)
+ .setTimerFamilyId(timerFamilyId)
+ .build();
}
@Override
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index eaeb6b3..3880dd9 100644
---
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -70,6 +70,7 @@ import org.rocksdb.WriteOptions;
*/
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SamzaTimerInternalsFactoryTest {
@Rule public transient TemporaryFolder temporaryFolder = new
TemporaryFolder();
@@ -302,7 +303,16 @@ public class SamzaTimerInternalsFactoryTest {
"timer2", nameSpace, new Instant(100), new Instant(100),
TimeDomain.PROCESSING_TIME);
timerInternals.setTimer(timer2);
- assertEquals(2, timerRegistry.timers.size());
+ final TimerInternals.TimerData timer3 =
+ TimerInternals.TimerData.of(
+ "timer3",
+ "timerFamilyId3",
+ nameSpace,
+ new Instant(100),
+ new Instant(100),
+ TimeDomain.PROCESSING_TIME);
+ timerInternals.setTimer(timer3);
+ assertEquals(3, timerRegistry.timers.size());
store.close();
@@ -312,14 +322,14 @@ public class SamzaTimerInternalsFactoryTest {
final SamzaTimerInternalsFactory<String> restoredFactory =
createTimerInternalsFactory(restoredRegistry, "timer",
pipelineOptions, store);
- assertEquals(2, restoredRegistry.timers.size());
+ assertEquals(3, restoredRegistry.timers.size());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
StringUtf8Coder.of().encode("testKey", baos);
final byte[] keyBytes = baos.toByteArray();
- restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes,
"testKey", timer1));
- restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes,
"testKey", timer2));
-
+ restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes,
"testKey", timer1));
+ restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes,
"testKey", timer2));
+ restoredFactory.removeProcessingTimer(new KeyedTimerData<>(keyBytes,
"testKey", timer3));
store.close();
}