This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f335f1f7b7c370c2b1023f49d8c9f16002fbec33 Author: Zakelly <[email protected]> AuthorDate: Fri Feb 7 16:58:27 2025 +0800 [hotfix] Add constructors for state v2 descriptors that accept type class --- .../common/state/v2/AggregatingStateDescriptor.java | 21 +++++++++++++++++++-- .../api/common/state/v2/ListStateDescriptor.java | 13 +++++++++++++ .../api/common/state/v2/MapStateDescriptor.java | 16 ++++++++++++++++ .../common/state/v2/ReducingStateDescriptor.java | 17 +++++++++++++++++ .../flink/api/common/state/v2/StateDescriptor.java | 15 +++++++++++++++ .../common/state/v2/StateSerializerReference.java | 17 +++++++++++++++++ .../api/common/state/v2/ValueStateDescriptor.java | 13 +++++++++++++ 7 files changed, 110 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java index 26af7de75f2..56fcd6b4b45 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/AggregatingStateDescriptor.java @@ -43,7 +43,7 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AC private final AggregateFunction<IN, ACC, OUT> aggregateFunction; /** - * Create a new state descriptor with the given name, function, and type. + * Create a new {@code AggregatingStateDescriptor} with the given name, function, and type. * * @param stateId The (unique) name for the state. * @param aggregateFunction The {@code AggregateFunction} used to aggregate the state. @@ -58,7 +58,7 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AC } /** - * Create a new {@code ReducingStateDescriptor} with the given stateId and the given type + * Create a new {@code AggregatingStateDescriptor} with the given stateId and the given type * serializer. * * @param stateId The (unique) stateId for the state. @@ -72,6 +72,23 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AC this.aggregateFunction = checkNotNull(aggregateFunction); } + /** + * Creates a new {@code AggregatingStateDescriptor} with the given name, function, and type. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #AggregatingStateDescriptor(String, AggregateFunction, + * TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param aggFunction The {@code AggregateFunction} used to aggregate the state. + * @param stateType The type of the accumulator. The accumulator is stored in the state. + */ + public AggregatingStateDescriptor( + String name, AggregateFunction<IN, ACC, OUT> aggFunction, Class<ACC> stateType) { + super(name, stateType); + this.aggregateFunction = checkNotNull(aggFunction); + } + /** Returns the Aggregate function for this state. */ public AggregateFunction<IN, ACC, OUT> getAggregateFunction() { return aggregateFunction; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java index 1409e6bc52e..5c6d3e05f30 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java @@ -54,6 +54,19 @@ public class ListStateDescriptor<T> extends StateDescriptor<T> { super(stateId, serializer); } + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param elementTypeClass The type of the elements in the state. + */ + public ListStateDescriptor(String name, Class<T> elementTypeClass) { + super(name, elementTypeClass); + } + @Override public Type getType() { return Type.LIST; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java index adfe55a5c83..a2575a45c44 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java @@ -71,6 +71,22 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> { this.userKeySerializer = new StateSerializerReference<>(userKeySerializer); } + /** + * Create a new {@code MapStateDescriptor} with the given name and the given type information. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} + * constructor. + * + * @param name The name of the {@code MapStateDescriptor}. + * @param keyClass The class of the type of keys in the state. + * @param valueClass The class of the type of values in the state. + */ + public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) { + super(name, valueClass); + this.userKeySerializer = new StateSerializerReference<>(keyClass); + } + @Nonnull public TypeSerializer<UK> getUserKeySerializer() { TypeSerializer<UK> serializer = userKeySerializer.get(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java index f3bbf18fbe1..c68fbe0a3a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ReducingStateDescriptor.java @@ -67,6 +67,23 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<T> { this.reduceFunction = checkNotNull(reduceFunction); } + /** + * Creates a new {@code ReducingStateDescriptor} with the given name, type, and default value. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} + * constructor. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeClass The type of the values in the state. + */ + public ReducingStateDescriptor( + String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) { + super(name, typeClass); + this.reduceFunction = checkNotNull(reduceFunction); + } + /** Returns the reduce function to be used for the reducing state. */ public ReduceFunction<T> getReduceFunction() { return reduceFunction; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java index 3b16231462c..98c4ba07447 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateDescriptor.java @@ -87,6 +87,21 @@ public abstract class StateDescriptor<T> implements Serializable { this.typeSerializer = new StateSerializerReference<>(serializer); } + /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #StateDescriptor(String, TypeInformation)} constructor. + * + * @param name The name of the {@code StateDescriptor}. + * @param type The class of the type of values in the state. + */ + protected StateDescriptor(String name, Class<T> type) { + this.stateId = checkNotNull(name, "name must not be null"); + checkNotNull(type, "type class must not be null"); + this.typeSerializer = new StateSerializerReference<>(type); + } + // ------------------------------------------------------------------------ /** diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java index d71c949301b..7fb658cce34 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/StateSerializerReference.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.SerializerFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,22 @@ class StateSerializerReference<T> extends AtomicReference<TypeSerializer<T>> { this.typeInfo = null; } + public StateSerializerReference(Class<T> clazz) { + try { + this.typeInfo = TypeExtractor.createTypeInfo(clazz); + } catch (Exception e) { + throw new RuntimeException( + "Could not create the type information for '" + + clazz.getName() + + "'. " + + "The most common reason is failure to infer the generic type information, due to Java's type erasure. " + + "In that case, please pass a 'TypeHint' instead of a class to describe the type. " + + "For example, to describe 'Tuple2<String, String>' as a generic type, use " + + "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", + e); + } + } + public TypeInformation<T> getTypeInformation() { return typeInfo; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java index dc5b1251cbd..6d266c9124b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/v2/ValueStateDescriptor.java @@ -54,6 +54,19 @@ public class ValueStateDescriptor<T> extends StateDescriptor<T> { super(stateId, serializer); } + /** + * Creates a new {@code ValueStateDescriptor} with the given name and type + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ValueStateDescriptor(String, TypeInformation)} constructor. + * + * @param stateId The (unique) name for the state. + * @param typeClass The type of the values in the state. + */ + public ValueStateDescriptor(@Nonnull String stateId, @Nonnull Class<T> typeClass) { + super(stateId, typeClass); + } + @Override public Type getType() { return Type.VALUE;
