This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit dc929845631780bf2aec71831fbd4ad8ac51e3bf
Author: Igal Shilman <[email protected]>
AuthorDate: Fri May 15 13:27:40 2020 +0200

    [FLINK-17644] Add Expiration to the Persisted states
---
 .../sdk/state/PersistedAppendingBuffer.java        | 30 ++++++++++++++++++--
 .../flink/statefun/sdk/state/PersistedTable.java   | 33 ++++++++++++++++++++--
 .../flink/statefun/sdk/state/PersistedValue.java   | 25 ++++++++++++++--
 3 files changed, 82 insertions(+), 6 deletions(-)

diff --git 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
index d5c6081..42032c5 100644
--- 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
+++ 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedAppendingBuffer.java
@@ -40,12 +40,17 @@ import org.apache.flink.statefun.sdk.annotations.Persisted;
 public final class PersistedAppendingBuffer<E> {
   private final String name;
   private final Class<E> elementType;
+  private final Expiration expiration;
   private AppendingBufferAccessor<E> accessor;
 
   private PersistedAppendingBuffer(
-      String name, Class<E> elementType, AppendingBufferAccessor<E> accessor) {
+      String name,
+      Class<E> elementType,
+      Expiration expiration,
+      AppendingBufferAccessor<E> accessor) {
     this.name = Objects.requireNonNull(name);
     this.elementType = Objects.requireNonNull(elementType);
+    this.expiration = Objects.requireNonNull(expiration);
     this.accessor = Objects.requireNonNull(accessor);
   }
 
@@ -60,7 +65,24 @@ public final class PersistedAppendingBuffer<E> {
    * @return a {@code PersistedAppendingBuffer} instance.
    */
   public static <E> PersistedAppendingBuffer<E> of(String name, Class<E> 
elementType) {
-    return new PersistedAppendingBuffer<>(name, elementType, new 
NonFaultTolerantAccessor<>());
+    return of(name, elementType, Expiration.none());
+  }
+
+  /**
+   * Creates a {@link PersistedAppendingBuffer} instance that may be used to 
access persisted state
+   * managed by the system. Access to the persisted buffer is identified by an 
unique name and type
+   * of the elements. These may not change across multiple executions of the 
application.
+   *
+   * @param name the unique name of the persisted buffer state
+   * @param elementType the type of the elements of this {@code 
PersistedAppendingBuffer}.
+   * @param expiration state expiration configuration.
+   * @param <E> the type of the elements.
+   * @return a {@code PersistedAppendingBuffer} instance.
+   */
+  public static <E> PersistedAppendingBuffer<E> of(
+      String name, Class<E> elementType, Expiration expiration) {
+    return new PersistedAppendingBuffer<>(
+        name, elementType, expiration, new NonFaultTolerantAccessor<>());
   }
 
   /**
@@ -81,6 +103,10 @@ public final class PersistedAppendingBuffer<E> {
     return elementType;
   }
 
+  public Expiration expiration() {
+    return expiration;
+  }
+
   /**
    * Appends an element to the persisted buffer.
    *
diff --git 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java
 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java
index b2dff12..a04c126 100644
--- 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java
+++ 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedTable.java
@@ -40,13 +40,19 @@ public final class PersistedTable<K, V> {
   private final String name;
   private final Class<K> keyType;
   private final Class<V> valueType;
+  private final Expiration expiration;
   private TableAccessor<K, V> accessor;
 
   private PersistedTable(
-      String name, Class<K> keyType, Class<V> valueType, TableAccessor<K, V> 
accessor) {
+      String name,
+      Class<K> keyType,
+      Class<V> valueType,
+      Expiration expiration,
+      TableAccessor<K, V> accessor) {
     this.name = Objects.requireNonNull(name);
     this.keyType = Objects.requireNonNull(keyType);
     this.valueType = Objects.requireNonNull(valueType);
+    this.expiration = Objects.requireNonNull(expiration);
     this.accessor = Objects.requireNonNull(accessor);
   }
 
@@ -63,7 +69,26 @@ public final class PersistedTable<K, V> {
    * @return a {@code PersistedTable} instance.
    */
   public static <K, V> PersistedTable<K, V> of(String name, Class<K> keyType, 
Class<V> valueType) {
-    return new PersistedTable<>(name, keyType, valueType, new 
NonFaultTolerantAccessor<>());
+    return of(name, keyType, valueType, Expiration.none());
+  }
+
+  /**
+   * Creates a {@link PersistedTable} instance that may be used to access 
persisted state managed by
+   * the system. Access to the persisted table is identified by an unique 
name, type of the key, and
+   * type of the value. These may not change across multiple executions of the 
application.
+   *
+   * @param name the unique name of the persisted state.
+   * @param keyType the type of the state keys of this {@code PersistedTable}.
+   * @param valueType the type of the state values of this {@code 
PersistedTale}.
+   * @param expiration state expiration configuration.
+   * @param <K> the type of the state keys.
+   * @param <V> the type of the state values.
+   * @return a {@code PersistedTable} instance.
+   */
+  public static <K, V> PersistedTable<K, V> of(
+      String name, Class<K> keyType, Class<V> valueType, Expiration 
expiration) {
+    return new PersistedTable<>(
+        name, keyType, valueType, expiration, new 
NonFaultTolerantAccessor<>());
   }
 
   /**
@@ -93,6 +118,10 @@ public final class PersistedTable<K, V> {
     return valueType;
   }
 
+  public Expiration expiration() {
+    return expiration;
+  }
+
   /**
    * Returns a persisted table's value.
    *
diff --git 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java
 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java
index a656e72..3d78b75 100644
--- 
a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java
+++ 
b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedValue.java
@@ -37,11 +37,13 @@ import org.apache.flink.statefun.sdk.annotations.Persisted;
 public final class PersistedValue<T> {
   private final String name;
   private final Class<T> type;
+  private final Expiration expiration;
   private Accessor<T> accessor;
 
-  private PersistedValue(String name, Class<T> type, Accessor<T> accessor) {
+  private PersistedValue(String name, Class<T> type, Expiration expiration, 
Accessor<T> accessor) {
     this.name = Objects.requireNonNull(name);
     this.type = Objects.requireNonNull(type);
+    this.expiration = Objects.requireNonNull(expiration);
     this.accessor = Objects.requireNonNull(accessor);
   }
 
@@ -56,7 +58,22 @@ public final class PersistedValue<T> {
    * @return a {@code PersistedValue} instance.
    */
   public static <T> PersistedValue<T> of(String name, Class<T> type) {
-    return new PersistedValue<>(name, type, new NonFaultTolerantAccessor<>());
+    return of(name, type, Expiration.none());
+  }
+
+  /**
+   * Creates a {@link PersistedValue} instance that may be used to access 
persisted state managed by
+   * the system. Access to the persisted value is identified by an unique name 
and type of the
+   * value. These may not change across multiple executions of the application.
+   *
+   * @param name the unique name of the persisted state.
+   * @param type the type of the state values of this {@code PersistedValue}.
+   * @param expiration state expiration configuration.
+   * @param <T> the type of the state values.
+   * @return a {@code PersistedValue} instance.
+   */
+  public static <T> PersistedValue<T> of(String name, Class<T> type, 
Expiration expiration) {
+    return new PersistedValue<>(name, type, expiration, new 
NonFaultTolerantAccessor<>());
   }
 
   /**
@@ -77,6 +94,10 @@ public final class PersistedValue<T> {
     return type;
   }
 
+  public Expiration expiration() {
+    return expiration;
+  }
+
   /**
    * Returns the persisted value.
    *

Reply via email to