This is an automated email from the ASF dual-hosted git repository. sjwiesman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit dc929845631780bf2aec71831fbd4ad8ac51e3bf Author: Igal Shilman <[email protected]> AuthorDate: Fri May 15 13:27:40 2020 +0200 [FLINK-17644] Add Expiration to the Persisted states --- .../sdk/state/PersistedAppendingBuffer.java | 30 ++++++++++++++++++-- .../flink/statefun/sdk/state/PersistedTable.java | 33 ++++++++++++++++++++-- .../flink/statefun/sdk/state/PersistedValue.java | 25 ++++++++++++++-- 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java index d5c6081..42032c5 100644 --- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java @@ -40,12 +40,17 @@ import org.apache.flink.statefun.sdk.annotations.Persisted; public final class PersistedAppendingBuffer<E> { private final String name; private final Class<E> elementType; + private final Expiration expiration; private AppendingBufferAccessor<E> accessor; private PersistedAppendingBuffer( - String name, Class<E> elementType, AppendingBufferAccessor<E> accessor) { + String name, + Class<E> elementType, + Expiration expiration, + AppendingBufferAccessor<E> accessor) { this.name = Objects.requireNonNull(name); this.elementType = Objects.requireNonNull(elementType); + this.expiration = Objects.requireNonNull(expiration); this.accessor = Objects.requireNonNull(accessor); } @@ -60,7 +65,24 @@ public final class PersistedAppendingBuffer<E> { * @return a {@code PersistedAppendingBuffer} instance. */ public static <E> PersistedAppendingBuffer<E> of(String name, Class<E> elementType) { - return new PersistedAppendingBuffer<>(name, elementType, new NonFaultTolerantAccessor<>()); + return of(name, elementType, Expiration.none()); + } + + /** + * Creates a {@link PersistedAppendingBuffer} instance that may be used to access persisted state + * managed by the system. Access to the persisted buffer is identified by an unique name and type + * of the elements. These may not change across multiple executions of the application. + * + * @param name the unique name of the persisted buffer state + * @param elementType the type of the elements of this {@code PersistedAppendingBuffer}. + * @param expiration state expiration configuration. + * @param <E> the type of the elements. + * @return a {@code PersistedAppendingBuffer} instance. + */ + public static <E> PersistedAppendingBuffer<E> of( + String name, Class<E> elementType, Expiration expiration) { + return new PersistedAppendingBuffer<>( + name, elementType, expiration, new NonFaultTolerantAccessor<>()); } /** @@ -81,6 +103,10 @@ public final class PersistedAppendingBuffer<E> { return elementType; } + public Expiration expiration() { + return expiration; + } + /** * Appends an element to the persisted buffer. * diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java index b2dff12..a04c126 100644 --- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java @@ -40,13 +40,19 @@ public final class PersistedTable<K, V> { private final String name; private final Class<K> keyType; private final Class<V> valueType; + private final Expiration expiration; private TableAccessor<K, V> accessor; private PersistedTable( - String name, Class<K> keyType, Class<V> valueType, TableAccessor<K, V> accessor) { + String name, + Class<K> keyType, + Class<V> valueType, + Expiration expiration, + TableAccessor<K, V> accessor) { this.name = Objects.requireNonNull(name); this.keyType = Objects.requireNonNull(keyType); this.valueType = Objects.requireNonNull(valueType); + this.expiration = Objects.requireNonNull(expiration); this.accessor = Objects.requireNonNull(accessor); } @@ -63,7 +69,26 @@ public final class PersistedTable<K, V> { * @return a {@code PersistedTable} instance. */ public static <K, V> PersistedTable<K, V> of(String name, Class<K> keyType, Class<V> valueType) { - return new PersistedTable<>(name, keyType, valueType, new NonFaultTolerantAccessor<>()); + return of(name, keyType, valueType, Expiration.none()); + } + + /** + * Creates a {@link PersistedTable} instance that may be used to access persisted state managed by + * the system. Access to the persisted table is identified by an unique name, type of the key, and + * type of the value. These may not change across multiple executions of the application. + * + * @param name the unique name of the persisted state. + * @param keyType the type of the state keys of this {@code PersistedTable}. + * @param valueType the type of the state values of this {@code PersistedTale}. + * @param expiration state expiration configuration. + * @param <K> the type of the state keys. + * @param <V> the type of the state values. + * @return a {@code PersistedTable} instance. + */ + public static <K, V> PersistedTable<K, V> of( + String name, Class<K> keyType, Class<V> valueType, Expiration expiration) { + return new PersistedTable<>( + name, keyType, valueType, expiration, new NonFaultTolerantAccessor<>()); } /** @@ -93,6 +118,10 @@ public final class PersistedTable<K, V> { return valueType; } + public Expiration expiration() { + return expiration; + } + /** * Returns a persisted table's value. * diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java index a656e72..3d78b75 100644 --- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java +++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java @@ -37,11 +37,13 @@ import org.apache.flink.statefun.sdk.annotations.Persisted; public final class PersistedValue<T> { private final String name; private final Class<T> type; + private final Expiration expiration; private Accessor<T> accessor; - private PersistedValue(String name, Class<T> type, Accessor<T> accessor) { + private PersistedValue(String name, Class<T> type, Expiration expiration, Accessor<T> accessor) { this.name = Objects.requireNonNull(name); this.type = Objects.requireNonNull(type); + this.expiration = Objects.requireNonNull(expiration); this.accessor = Objects.requireNonNull(accessor); } @@ -56,7 +58,22 @@ public final class PersistedValue<T> { * @return a {@code PersistedValue} instance. */ public static <T> PersistedValue<T> of(String name, Class<T> type) { - return new PersistedValue<>(name, type, new NonFaultTolerantAccessor<>()); + return of(name, type, Expiration.none()); + } + + /** + * Creates a {@link PersistedValue} instance that may be used to access persisted state managed by + * the system. Access to the persisted value is identified by an unique name and type of the + * value. These may not change across multiple executions of the application. + * + * @param name the unique name of the persisted state. + * @param type the type of the state values of this {@code PersistedValue}. + * @param expiration state expiration configuration. + * @param <T> the type of the state values. + * @return a {@code PersistedValue} instance. + */ + public static <T> PersistedValue<T> of(String name, Class<T> type, Expiration expiration) { + return new PersistedValue<>(name, type, expiration, new NonFaultTolerantAccessor<>()); } /** @@ -77,6 +94,10 @@ public final class PersistedValue<T> { return type; } + public Expiration expiration() { + return expiration; + } + /** * Returns the persisted value. *
