This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f335f1f7b7c370c2b1023f49d8c9f16002fbec33
Author: Zakelly <[email protected]>
AuthorDate: Fri Feb 7 16:58:27 2025 +0800

    [hotfix] Add constructors for state v2 descriptors that accept type class
---
 .../common/state/v2/AggregatingStateDescriptor.java | 21 +++++++++++++++++++--
 .../api/common/state/v2/ListStateDescriptor.java    | 13 +++++++++++++
 .../api/common/state/v2/MapStateDescriptor.java     | 16 ++++++++++++++++
 .../common/state/v2/ReducingStateDescriptor.java    | 17 +++++++++++++++++
 .../flink/api/common/state/v2/StateDescriptor.java  | 15 +++++++++++++++
 .../common/state/v2/StateSerializerReference.java   | 17 +++++++++++++++++
 .../api/common/state/v2/ValueStateDescriptor.java   | 13 +++++++++++++
 7 files changed, 110 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
index 26af7de75f2..56fcd6b4b45 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java
@@ -43,7 +43,7 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends 
StateDescriptor<AC
     private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
 
     /**
-     * Create a new state descriptor with the given name, function, and type.
+     * Create a new {@code AggregatingStateDescriptor} with the given name, 
function, and type.
      *
      * @param stateId The (unique) name for the state.
      * @param aggregateFunction The {@code AggregateFunction} used to 
aggregate the state.
@@ -58,7 +58,7 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends 
StateDescriptor<AC
     }
 
     /**
-     * Create a new {@code ReducingStateDescriptor} with the given stateId and 
the given type
+     * Create a new {@code AggregatingStateDescriptor} with the given stateId 
and the given type
      * serializer.
      *
      * @param stateId The (unique) stateId for the state.
@@ -72,6 +72,23 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> 
extends StateDescriptor<AC
         this.aggregateFunction = checkNotNull(aggregateFunction);
     }
 
+    /**
+     * Creates a new {@code AggregatingStateDescriptor} with the given name, 
function, and type.
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #AggregatingStateDescriptor(String, 
AggregateFunction,
+     * TypeInformation)} constructor.
+     *
+     * @param name The (unique) name for the state.
+     * @param aggFunction The {@code AggregateFunction} used to aggregate the 
state.
+     * @param stateType The type of the accumulator. The accumulator is stored 
in the state.
+     */
+    public AggregatingStateDescriptor(
+            String name, AggregateFunction<IN, ACC, OUT> aggFunction, 
Class<ACC> stateType) {
+        super(name, stateType);
+        this.aggregateFunction = checkNotNull(aggFunction);
+    }
+
     /** Returns the Aggregate function for this state. */
     public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
         return aggregateFunction;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
index 1409e6bc52e..5c6d3e05f30 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
@@ -54,6 +54,19 @@ public class ListStateDescriptor<T> extends 
StateDescriptor<T> {
         super(stateId, serializer);
     }
 
+    /**
+     * Creates a new {@code ListStateDescriptor} with the given name and list 
element type.
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #ListStateDescriptor(String, 
TypeInformation)} constructor.
+     *
+     * @param name The (unique) name for the state.
+     * @param elementTypeClass The type of the elements in the state.
+     */
+    public ListStateDescriptor(String name, Class<T> elementTypeClass) {
+        super(name, elementTypeClass);
+    }
+
     @Override
     public Type getType() {
         return Type.LIST;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
index adfe55a5c83..a2575a45c44 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
@@ -71,6 +71,22 @@ public class MapStateDescriptor<UK, UV> extends 
StateDescriptor<UV> {
         this.userKeySerializer = new 
StateSerializerReference<>(userKeySerializer);
     }
 
+    /**
+     * Create a new {@code MapStateDescriptor} with the given name and the 
given type information.
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #MapStateDescriptor(String, TypeInformation, 
TypeInformation)}
+     * constructor.
+     *
+     * @param name The name of the {@code MapStateDescriptor}.
+     * @param keyClass The class of the type of keys in the state.
+     * @param valueClass The class of the type of values in the state.
+     */
+    public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> 
valueClass) {
+        super(name, valueClass);
+        this.userKeySerializer = new StateSerializerReference<>(keyClass);
+    }
+
     @Nonnull
     public TypeSerializer<UK> getUserKeySerializer() {
         TypeSerializer<UK> serializer = userKeySerializer.get();
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
index f3bbf18fbe1..c68fbe0a3a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java
@@ -67,6 +67,23 @@ public class ReducingStateDescriptor<T> extends 
StateDescriptor<T> {
         this.reduceFunction = checkNotNull(reduceFunction);
     }
 
+    /**
+     * Creates a new {@code ReducingStateDescriptor} with the given name, 
type, and default value.
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #ReducingStateDescriptor(String, 
ReduceFunction, TypeInformation)}
+     * constructor.
+     *
+     * @param name The (unique) name for the state.
+     * @param reduceFunction The {@code ReduceFunction} used to aggregate the 
state.
+     * @param typeClass The type of the values in the state.
+     */
+    public ReducingStateDescriptor(
+            String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) 
{
+        super(name, typeClass);
+        this.reduceFunction = checkNotNull(reduceFunction);
+    }
+
     /** Returns the reduce function to be used for the reducing state. */
     public ReduceFunction<T> getReduceFunction() {
         return reduceFunction;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
index 3b16231462c..98c4ba07447 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java
@@ -87,6 +87,21 @@ public abstract class StateDescriptor<T> implements 
Serializable {
         this.typeSerializer = new StateSerializerReference<>(serializer);
     }
 
+    /**
+     * Create a new {@code StateDescriptor} with the given name and the given 
type information.
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #StateDescriptor(String, TypeInformation)} 
constructor.
+     *
+     * @param name The name of the {@code StateDescriptor}.
+     * @param type The class of the type of values in the state.
+     */
+    protected StateDescriptor(String name, Class<T> type) {
+        this.stateId = checkNotNull(name, "name must not be null");
+        checkNotNull(type, "type class must not be null");
+        this.typeSerializer = new StateSerializerReference<>(type);
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
index d71c949301b..7fb658cce34 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.SerializerFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,22 @@ class StateSerializerReference<T> extends 
AtomicReference<TypeSerializer<T>> {
         this.typeInfo = null;
     }
 
+    public StateSerializerReference(Class<T> clazz) {
+        try {
+            this.typeInfo = TypeExtractor.createTypeInfo(clazz);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create the type information for '"
+                            + clazz.getName()
+                            + "'. "
+                            + "The most common reason is failure to infer the 
generic type information, due to Java's type erasure. "
+                            + "In that case, please pass a 'TypeHint' instead 
of a class to describe the type. "
+                            + "For example, to describe 'Tuple2<String, 
String>' as a generic type, use "
+                            + "'new PravegaDeserializationSchema<>(new 
TypeHint<Tuple2<String, String>>(){}, serializer);'",
+                    e);
+        }
+    }
+
     public TypeInformation<T> getTypeInformation() {
         return typeInfo;
     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
index dc5b1251cbd..6d266c9124b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java
@@ -54,6 +54,19 @@ public class ValueStateDescriptor<T> extends 
StateDescriptor<T> {
         super(stateId, serializer);
     }
 
+    /**
+     * Creates a new {@code ValueStateDescriptor} with the given name and type
+     *
+     * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+     * consider using the {@link #ValueStateDescriptor(String, 
TypeInformation)} constructor.
+     *
+     * @param stateId The (unique) name for the state.
+     * @param typeClass The type of the values in the state.
+     */
+    public ValueStateDescriptor(@Nonnull String stateId, @Nonnull Class<T> 
typeClass) {
+        super(stateId, typeClass);
+    }
+
     @Override
     public Type getType() {
         return Type.VALUE;

Reply via email to