This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 73fe043723d [FLINK-38807][state] Add state SQL metadata based type 
inference
73fe043723d is described below

commit 73fe043723dc09baed111fc54c7864747fcba2cf
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Feb 23 19:12:52 2026 +0100

    [FLINK-38807][state] Add state SQL metadata based type inference
---
 docs/content/docs/libs/state_processor_api.md      |  10 +-
 .../flink/state/api/runtime/SavepointLoader.java   |  80 +++-
 .../table/SavepointDataStreamScanProvider.java     |  18 +-
 .../table/SavepointDynamicTableSourceFactory.java  | 353 +++++----------
 .../state/table/SavepointTypeInfoResolver.java     | 494 +++++++++++++++++++++
 .../state/table/StateValueColumnConfiguration.java |  22 +-
 ...GenericAvroSavepointTypeInformationFactory.java |  32 --
 .../table/SavepointDynamicTableSourceTest.java     |  34 +-
 .../SavepointMetadataDynamicTableSourceTest.java   |   8 +-
 .../table/SavepointTypeInformationFactoryTest.java | 129 ++++++
 ...pecificAvroSavepointTypeInformationFactory.java |  33 --
 .../src/test/resources/table-state/_metadata       | Bin 22390 -> 26034 bytes
 12 files changed, 864 insertions(+), 349 deletions(-)

diff --git a/docs/content/docs/libs/state_processor_api.md 
b/docs/content/docs/libs/state_processor_api.md
index 56ac18337db..3690de6be58 100644
--- a/docs/content/docs/libs/state_processor_api.md
+++ b/docs/content/docs/libs/state_processor_api.md
@@ -586,13 +586,6 @@ public class StatefulFunction extends 
KeyedProcessFunction<Integer, Integer, Voi
   }
   ...
 }
-
-public class AvroSavepointTypeInformationFactory implements 
SavepointTypeInformationFactory {
-    @Override
-    public TypeInformation<?> getTypeInformation() {
-        return new AvroTypeInfo<>(AvroRecord.class);
-    }
-}
 ```
 
 Then it can read by querying a table created using the following SQL statement:
@@ -609,8 +602,7 @@ CREATE TABLE state_table (
   'connector' = 'savepoint',
   'state.backend.type' = 'rocksdb',
   'state.path' = '/root/dir/of/checkpoint-data/chk-1',
-  'operator.uid' = 'my-uid',
-  'fields.MyAvroState.value-type-factory' = 
'org.apache.flink.state.table.AvroSavepointTypeInformationFactory'
+  'operator.uid' = 'my-uid'
 );
 ```
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
index 1b2cdedefaf..e4ea285c0e8 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java
@@ -19,15 +19,27 @@
 package org.apache.flink.state.api.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.api.OperatorIdentifier;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-/** Utility class for loading {@link CheckpointMetadata} metadata. */
+/** Utility class for loading savepoint metadata and operator state 
information. */
 @Internal
 public final class SavepointLoader {
     private SavepointLoader() {}
@@ -55,4 +67,70 @@ public final class SavepointLoader {
                     stream, Thread.currentThread().getContextClassLoader(), 
savepointPath);
         }
     }
+
+    /**
+     * Loads all state metadata for an operator in a single I/O operation.
+     *
+     * @param savepointPath Path to the savepoint directory
+     * @param operatorIdentifier Operator UID or hash
+     * @return Map from state name to StateMetaInfoSnapshot
+     * @throws IOException If reading fails
+     */
+    public static Map<String, StateMetaInfoSnapshot> loadOperatorStateMetadata(
+            String savepointPath, OperatorIdentifier operatorIdentifier) 
throws IOException {
+
+        CheckpointMetadata checkpointMetadata = 
loadSavepointMetadata(savepointPath);
+
+        OperatorState operatorState =
+                checkpointMetadata.getOperatorStates().stream()
+                        .filter(
+                                state ->
+                                        operatorIdentifier
+                                                .getOperatorId()
+                                                .equals(state.getOperatorID()))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Operator "
+                                                        + operatorIdentifier
+                                                        + " not found in 
savepoint"));
+
+        KeyedStateHandle keyedStateHandle =
+                operatorState.getStates().stream()
+                        .flatMap(s -> s.getManagedKeyedState().stream())
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "No keyed state found for 
operator "
+                                                        + operatorIdentifier));
+
+        KeyedBackendSerializationProxy<?> proxy = 
readSerializationProxy(keyedStateHandle);
+        return proxy.getStateMetaInfoSnapshots().stream()
+                .collect(Collectors.toMap(StateMetaInfoSnapshot::getName, 
Function.identity()));
+    }
+
+    private static KeyedBackendSerializationProxy<?> readSerializationProxy(
+            KeyedStateHandle stateHandle) throws IOException {
+
+        StreamStateHandle streamStateHandle;
+        if (stateHandle instanceof KeyGroupsStateHandle) {
+            streamStateHandle = ((KeyGroupsStateHandle) 
stateHandle).getDelegateStateHandle();
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported KeyedStateHandle type: " + 
stateHandle.getClass());
+        }
+
+        try (FSDataInputStream inputStream = 
streamStateHandle.openInputStream()) {
+            DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(inputStream);
+
+            KeyedBackendSerializationProxy<?> proxy =
+                    new KeyedBackendSerializationProxy<>(
+                            Thread.currentThread().getContextClassLoader());
+            proxy.read(inputView);
+
+            return proxy;
+        }
+    }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
index 5393e4fa01a..fd24b0f2448 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateBackendOptions;
@@ -91,32 +92,33 @@ public class SavepointDataStreamScanProvider implements 
DataStreamScanProvider {
 
             // Get value state descriptors
             for (StateValueColumnConfiguration columnConfig : 
keyValueProjections.f1) {
-                TypeInformation valueTypeInfo = 
columnConfig.getValueTypeInfo();
+                TypeSerializer valueTypeSerializer = 
columnConfig.getValueTypeSerializer();
 
                 switch (columnConfig.getStateType()) {
                     case VALUE:
                         columnConfig.setStateDescriptor(
                                 new ValueStateDescriptor<>(
-                                        columnConfig.getStateName(), 
valueTypeInfo));
+                                        columnConfig.getStateName(), 
valueTypeSerializer));
                         break;
 
                     case LIST:
                         columnConfig.setStateDescriptor(
                                 new ListStateDescriptor<>(
-                                        columnConfig.getStateName(), 
valueTypeInfo));
+                                        columnConfig.getStateName(), 
valueTypeSerializer));
                         break;
 
                     case MAP:
-                        TypeInformation<?> mapKeyTypeInfo = 
columnConfig.getMapKeyTypeInfo();
-                        if (mapKeyTypeInfo == null) {
+                        TypeSerializer<?> mapKeyTypeSerializer =
+                                columnConfig.getMapKeyTypeSerializer();
+                        if (mapKeyTypeSerializer == null) {
                             throw new ConfigurationException(
-                                    "Map key type information is required for 
map state");
+                                    "Map key type serializer is required for 
map state");
                         }
                         columnConfig.setStateDescriptor(
                                 new MapStateDescriptor<>(
                                         columnConfig.getStateName(),
-                                        mapKeyTypeInfo,
-                                        valueTypeInfo));
+                                        mapKeyTypeSerializer,
+                                        valueTypeSerializer));
                         break;
 
                     default:
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java
index 7ac206d0081..85220c4e536 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.state.table;
 
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.state.api.OperatorIdentifier;
+import org.apache.flink.state.api.runtime.SavepointLoader;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,26 +36,23 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.state.table.SavepointConnectorOptions.FIELDS;
 import static org.apache.flink.state.table.SavepointConnectorOptions.KEY_CLASS;
 import static 
org.apache.flink.state.table.SavepointConnectorOptions.KEY_CLASS_PLACEHOLDER;
@@ -73,15 +75,27 @@ import static 
org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 
 /** Dynamic source factory for {@link SavepointDynamicTableSource}. */
 public class SavepointDynamicTableSourceFactory implements 
DynamicTableSourceFactory {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SavepointDynamicTableSourceFactory.class);
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         Configuration options = new Configuration();
         context.getCatalogTable().getOptions().forEach(options::setString);
+        SerializerConfig serializerConfig = new SerializerConfigImpl(options);
 
         final String stateBackendType = 
options.getOptional(STATE_BACKEND_TYPE).orElse(null);
         final String statePath = options.get(STATE_PATH);
         final OperatorIdentifier operatorIdentifier = 
getOperatorIdentifier(options);
 
+        final Map<String, StateMetaInfoSnapshot> preloadedStateMetadata =
+                preloadStateMetadata(statePath, operatorIdentifier);
+
+        // Create resolver with preloaded metadata
+        SavepointTypeInfoResolver typeResolver =
+                new SavepointTypeInfoResolver(preloadedStateMetadata, 
serializerConfig);
+
         final Tuple2<Integer, int[]> keyValueProjections =
                 createKeyValueProjections(context.getCatalogTable());
 
@@ -94,127 +108,29 @@ public class SavepointDynamicTableSourceFactory 
implements DynamicTableSourceFac
 
         RowType.RowField keyRowField = 
rowType.getFields().get(keyValueProjections.f0);
         ConfigOption<String> keyFormatOption =
-                key(String.format("%s.%s.%s", FIELDS, keyRowField.getName(), 
VALUE_CLASS))
-                        .stringType()
-                        .noDefaultValue();
+                optionOf(keyRowField.getName(), 
VALUE_CLASS).stringType().noDefaultValue();
         optionalOptions.add(keyFormatOption);
 
         ConfigOption<String> keyTypeInfoFactoryOption =
-                key(String.format("%s.%s.%s", FIELDS, keyRowField.getName(), 
VALUE_TYPE_FACTORY))
-                        .stringType()
-                        .noDefaultValue();
+                optionOf(keyRowField.getName(), 
VALUE_TYPE_FACTORY).stringType().noDefaultValue();
         optionalOptions.add(keyTypeInfoFactoryOption);
 
         TypeInformation<?> keyTypeInfo =
-                getTypeInfo(options, keyFormatOption, 
keyTypeInfoFactoryOption, keyRowField, true);
+                typeResolver.resolveKeyType(
+                        options, keyFormatOption, keyTypeInfoFactoryOption, 
keyRowField);
 
         final Tuple2<Integer, List<StateValueColumnConfiguration>> 
keyValueConfigProjections =
                 Tuple2.of(
                         keyValueProjections.f0,
                         Arrays.stream(keyValueProjections.f1)
                                 .mapToObj(
-                                        columnIndex -> {
-                                            RowType.RowField valueRowField =
-                                                    
rowType.getFields().get(columnIndex);
-
-                                            ConfigOption<String> 
stateNameOption =
-                                                    key(String.format(
-                                                                    "%s.%s.%s",
-                                                                    FIELDS,
-                                                                    
valueRowField.getName(),
-                                                                    
STATE_NAME))
-                                                            .stringType()
-                                                            .noDefaultValue();
-                                            
optionalOptions.add(stateNameOption);
-
-                                            
ConfigOption<SavepointConnectorOptions.StateType>
-                                                    stateTypeOption =
-                                                            key(String.format(
-                                                                            
"%s.%s.%s",
-                                                                            
FIELDS,
-                                                                            
valueRowField.getName(),
-                                                                            
STATE_TYPE))
-                                                                    .enumType(
-                                                                            
SavepointConnectorOptions
-                                                                               
     .StateType
-                                                                               
     .class)
-                                                                    
.noDefaultValue();
-                                            
optionalOptions.add(stateTypeOption);
-
-                                            ConfigOption<String> 
mapKeyFormatOption =
-                                                    key(String.format(
-                                                                    "%s.%s.%s",
-                                                                    FIELDS,
-                                                                    
valueRowField.getName(),
-                                                                    KEY_CLASS))
-                                                            .stringType()
-                                                            .noDefaultValue();
-                                            
optionalOptions.add(mapKeyFormatOption);
-
-                                            ConfigOption<String> 
mapKeyTypeInfoFactoryOption =
-                                                    key(String.format(
-                                                                    "%s.%s.%s",
-                                                                    FIELDS,
-                                                                    
valueRowField.getName(),
-                                                                    
KEY_TYPE_FACTORY))
-                                                            .stringType()
-                                                            .noDefaultValue();
-                                            
optionalOptions.add(mapKeyTypeInfoFactoryOption);
-
-                                            ConfigOption<String> 
valueFormatOption =
-                                                    key(String.format(
-                                                                    "%s.%s.%s",
-                                                                    FIELDS,
-                                                                    
valueRowField.getName(),
-                                                                    
VALUE_CLASS))
-                                                            .stringType()
-                                                            .noDefaultValue();
-                                            
optionalOptions.add(valueFormatOption);
-
-                                            ConfigOption<String> 
valueTypeInfoFactoryOption =
-                                                    key(String.format(
-                                                                    "%s.%s.%s",
-                                                                    FIELDS,
-                                                                    
valueRowField.getName(),
-                                                                    
VALUE_TYPE_FACTORY))
-                                                            .stringType()
-                                                            .noDefaultValue();
-                                            
optionalOptions.add(valueTypeInfoFactoryOption);
-
-                                            LogicalType valueLogicalType = 
valueRowField.getType();
-
-                                            
SavepointConnectorOptions.StateType stateType =
-                                                    
options.getOptional(stateTypeOption)
-                                                            .orElseGet(
-                                                                    () ->
-                                                                            
inferStateType(
-                                                                               
     valueLogicalType));
-
-                                            TypeInformation<?> mapKeyTypeInfo =
-                                                    getTypeInfo(
-                                                            options,
-                                                            keyFormatOption,
-                                                            
mapKeyTypeInfoFactoryOption,
-                                                            valueRowField,
-                                                            stateType.equals(
-                                                                    
SavepointConnectorOptions
-                                                                            
.StateType.MAP));
-
-                                            TypeInformation<?> valueTypeInfo =
-                                                    getTypeInfo(
-                                                            options,
-                                                            valueFormatOption,
-                                                            
valueTypeInfoFactoryOption,
-                                                            valueRowField,
-                                                            true);
-                                            return new 
StateValueColumnConfiguration(
-                                                    columnIndex,
-                                                    
options.getOptional(stateNameOption)
-                                                            
.orElse(valueRowField.getName()),
-                                                    stateType,
-                                                    mapKeyTypeInfo,
-                                                    valueTypeInfo);
-                                        })
+                                        columnIndex ->
+                                                createStateColumnConfiguration(
+                                                        columnIndex,
+                                                        rowType,
+                                                        options,
+                                                        optionalOptions,
+                                                        typeResolver))
                                 .collect(Collectors.toList()));
         FactoryUtil.validateFactoryOptions(requiredOptions, optionalOptions, 
options);
 
@@ -234,6 +150,77 @@ public class SavepointDynamicTableSourceFactory implements 
DynamicTableSourceFac
                 rowType);
     }
 
+    private StateValueColumnConfiguration createStateColumnConfiguration(
+            int columnIndex,
+            RowType rowType,
+            Configuration options,
+            Set<ConfigOption<?>> optionalOptions,
+            SavepointTypeInfoResolver typeResolver) {
+
+        RowType.RowField valueRowField = rowType.getFields().get(columnIndex);
+
+        ConfigOption<String> stateNameOption =
+                optionOf(valueRowField.getName(), 
STATE_NAME).stringType().noDefaultValue();
+        optionalOptions.add(stateNameOption);
+
+        ConfigOption<SavepointConnectorOptions.StateType> stateTypeOption =
+                optionOf(valueRowField.getName(), STATE_TYPE)
+                        .enumType(SavepointConnectorOptions.StateType.class)
+                        .noDefaultValue();
+        optionalOptions.add(stateTypeOption);
+
+        ConfigOption<String> mapKeyFormatOption =
+                optionOf(valueRowField.getName(), 
KEY_CLASS).stringType().noDefaultValue();
+        optionalOptions.add(mapKeyFormatOption);
+
+        ConfigOption<String> mapKeyTypeInfoFactoryOption =
+                optionOf(valueRowField.getName(), 
KEY_TYPE_FACTORY).stringType().noDefaultValue();
+        optionalOptions.add(mapKeyTypeInfoFactoryOption);
+
+        ConfigOption<String> valueFormatOption =
+                optionOf(valueRowField.getName(), 
VALUE_CLASS).stringType().noDefaultValue();
+        optionalOptions.add(valueFormatOption);
+
+        ConfigOption<String> valueTypeInfoFactoryOption =
+                optionOf(valueRowField.getName(), 
VALUE_TYPE_FACTORY).stringType().noDefaultValue();
+        optionalOptions.add(valueTypeInfoFactoryOption);
+
+        LogicalType valueLogicalType = valueRowField.getType();
+
+        SavepointConnectorOptions.StateType stateType =
+                options.getOptional(stateTypeOption)
+                        .orElseGet(() -> inferStateType(valueLogicalType));
+
+        TypeSerializer<?> mapKeyTypeSerializer =
+                typeResolver.resolveSerializer(
+                        options,
+                        mapKeyFormatOption,
+                        mapKeyTypeInfoFactoryOption,
+                        valueRowField,
+                        
stateType.equals(SavepointConnectorOptions.StateType.MAP),
+                        SavepointTypeInfoResolver.InferenceContext.MAP_KEY);
+
+        TypeSerializer<?> valueTypeSerializer =
+                typeResolver.resolveSerializer(
+                        options,
+                        valueFormatOption,
+                        valueTypeInfoFactoryOption,
+                        valueRowField,
+                        true,
+                        SavepointTypeInfoResolver.InferenceContext.VALUE);
+
+        return new StateValueColumnConfiguration(
+                columnIndex,
+                
options.getOptional(stateNameOption).orElse(valueRowField.getName()),
+                stateType,
+                mapKeyTypeSerializer,
+                valueTypeSerializer);
+    }
+
+    private static ConfigOptions.OptionBuilder optionOf(String rowField, 
String optionName) {
+        return ConfigOptions.key(String.format("%s.%s.%s", FIELDS, rowField, 
optionName));
+    }
+
     private Tuple2<Integer, int[]> 
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         if (schema.getPrimaryKey().isEmpty()) {
@@ -271,46 +258,6 @@ public class SavepointDynamicTableSourceFactory implements 
DynamicTableSourceFac
         return physicalFields.filter(pos -> keyProjection != pos).toArray();
     }
 
-    private TypeInformation<?> getTypeInfo(
-            Configuration options,
-            ConfigOption<String> classOption,
-            ConfigOption<String> typeInfoFactoryOption,
-            RowType.RowField rowField,
-            boolean inferStateType) {
-        Optional<String> clazz = options.getOptional(classOption);
-        Optional<String> typeInfoFactory = 
options.getOptional(typeInfoFactoryOption);
-        if (clazz.isPresent() && typeInfoFactory.isPresent()) {
-            throw new IllegalArgumentException(
-                    "Either "
-                            + classOption.key()
-                            + " or "
-                            + typeInfoFactoryOption.key()
-                            + " can be specified for column "
-                            + rowField.getName()
-                            + ".");
-        }
-        try {
-            if (clazz.isPresent()) {
-                return TypeInformation.of(Class.forName(clazz.get()));
-            } else if (typeInfoFactory.isPresent()) {
-                SavepointTypeInformationFactory 
savepointTypeInformationFactory =
-                        (SavepointTypeInformationFactory)
-                                TypeUtils.getInstance(typeInfoFactory.get(), 
new Object[0]);
-                return savepointTypeInformationFactory.getTypeInformation();
-            } else {
-                if (inferStateType) {
-                    String inferredValueFormat =
-                            inferStateValueFormat(rowField.getName(), 
rowField.getType());
-                    return 
TypeInformation.of(Class.forName(inferredValueFormat));
-                } else {
-                    return null;
-                }
-            }
-        } catch (ReflectiveOperationException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     private SavepointConnectorOptions.StateType inferStateType(LogicalType 
logicalType) {
         switch (logicalType.getTypeRoot()) {
             case ARRAY:
@@ -324,82 +271,24 @@ public class SavepointDynamicTableSourceFactory 
implements DynamicTableSourceFac
         }
     }
 
-    @Nullable
-    private String inferStateMapKeyFormat(String columnName, LogicalType 
logicalType) {
-        return logicalType.is(LogicalTypeRoot.MAP)
-                ? inferStateValueFormat(columnName, ((MapType) 
logicalType).getKeyType())
-                : null;
-    }
-
-    private String inferStateValueFormat(String columnName, LogicalType 
logicalType) {
-        switch (logicalType.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                return String.class.getName();
-
-            case BOOLEAN:
-                return Boolean.class.getName();
-
-            case BINARY:
-            case VARBINARY:
-                return byte[].class.getName();
-
-            case DECIMAL:
-                return BigDecimal.class.getName();
-
-            case TINYINT:
-                return Byte.class.getName();
-
-            case SMALLINT:
-                return Short.class.getName();
-
-            case INTEGER:
-                return Integer.class.getName();
-
-            case BIGINT:
-                return Long.class.getName();
-
-            case FLOAT:
-                return Float.class.getName();
-
-            case DOUBLE:
-                return Double.class.getName();
-
-            case DATE:
-                return Integer.class.getName();
-
-            case INTERVAL_YEAR_MONTH:
-            case INTERVAL_DAY_TIME:
-                return Long.class.getName();
-
-            case ARRAY:
-                return inferStateValueFormat(
-                        columnName, ((ArrayType) 
logicalType).getElementType());
-
-            case MAP:
-                return inferStateValueFormat(columnName, ((MapType) 
logicalType).getValueType());
-
-            case NULL:
-                return null;
-
-            case ROW:
-            case MULTISET:
-            case TIME_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-            case DISTINCT_TYPE:
-            case STRUCTURED_TYPE:
-            case RAW:
-            case SYMBOL:
-            case UNRESOLVED:
-            case DESCRIPTOR:
-            default:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Unable to infer state format for SQL type: %s 
in column: %s. "
-                                        + "Please override the type with the 
following config parameter: %s.%s.%s",
-                                logicalType, columnName, FIELDS, columnName, 
VALUE_CLASS));
+    /**
+     * Preloads all state metadata for an operator in a single I/O operation.
+     *
+     * @param savepointPath Path to the savepoint
+     * @param operatorIdentifier Operator UID or hash
+     * @return Map from state name to StateMetaInfoSnapshot
+     */
+    private Map<String, StateMetaInfoSnapshot> preloadStateMetadata(
+            String savepointPath, OperatorIdentifier operatorIdentifier) {
+        try {
+            return SavepointLoader.loadOperatorStateMetadata(savepointPath, 
operatorIdentifier);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to load state metadata from savepoint '%s' 
for operator '%s'. "
+                                    + "Ensure the savepoint path is valid and 
the operator exists in the savepoint. ",
+                            savepointPath, operatorIdentifier),
+                    e);
         }
     }
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java
new file mode 100644
index 00000000000..c2571ce1988
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.state.table.SavepointConnectorOptions.FIELDS;
+import static 
org.apache.flink.state.table.SavepointConnectorOptions.VALUE_CLASS;
+
+/** Resolver for TypeInformation from savepoint metadata and configuration. */
+@Internal
+class SavepointTypeInfoResolver {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SavepointTypeInfoResolver.class);
+
+    /** Context for type inference to determine what aspect of the type we 
need. */
+    enum InferenceContext {
+        /** Inferring the key type of keyed state (always primitive). */
+        KEY,
+        /** Inferring the key type of a MAP state. */
+        MAP_KEY,
+        /** Inferring the value type (behavior depends on logical type). */
+        VALUE
+    }
+
+    private final Map<String, StateMetaInfoSnapshot> preloadedStateMetadata;
+    private final SerializerConfig serializerConfig;
+
+    public SavepointTypeInfoResolver(
+            Map<String, StateMetaInfoSnapshot> preloadedStateMetadata,
+            SerializerConfig serializerConfig) {
+        this.preloadedStateMetadata = preloadedStateMetadata;
+        this.serializerConfig = serializerConfig;
+    }
+
+    /**
+     * Resolves TypeInformation for keyed state keys (primitive types only).
+     *
+     * <p>This is a simplified version of type resolution specifically for key 
types, which are
+     * always primitive and don't require complex metadata inference.
+     *
+     * @param options Configuration containing table options
+     * @param classOption Config option for explicit class specification
+     * @param typeInfoFactoryOption Config option for type factory 
specification
+     * @param rowField The row field containing name and LogicalType
+     * @return The resolved TypeInformation for the key
+     * @throws IllegalArgumentException If both class and factory options are 
specified
+     * @throws RuntimeException If type instantiation fails
+     */
+    public TypeInformation<?> resolveKeyType(
+            Configuration options,
+            ConfigOption<String> classOption,
+            ConfigOption<String> typeInfoFactoryOption,
+            RowType.RowField rowField) {
+        try {
+            // Priority 1: Explicit configuration (backward compatibility)
+            TypeInformation<?> explicitTypeInfo =
+                    getExplicitTypeInfo(options, classOption, 
typeInfoFactoryOption);
+            if (explicitTypeInfo != null) {
+                return explicitTypeInfo;
+            }
+
+            // Priority 2: Simple primitive type inference from LogicalType
+            LogicalType logicalType = rowField.getType();
+            String columnName = rowField.getName();
+            return TypeInformation.of(getPrimitiveClass(logicalType, 
columnName));
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Resolves TypeSerializer for a table field using a three-tier priority 
system with direct
+     * serializer extraction for metadata inference.
+     *
+     * <h3>Three-Tier Priority System (Serializer-First)</h3>
+     *
+     * <ol>
+     *   <li><strong>Priority 1: Explicit Configuration</strong> (Highest 
priority) <br>
+     *       Uses user-specified class name or type factory from table 
options, then converts to
+     *       serializer.
+     *   <li><strong>Priority 2: Metadata Inference</strong> <br>
+     *       Directly extracts serializers from preloaded savepoint metadata 
(NO TypeInformation
+     *       conversion).
+     *   <li><strong>Priority 3: LogicalType Fallback</strong> (Lowest 
priority) <br>
+     *       Infers TypeInformation from table schema's LogicalType, then 
converts to serializer.
+     * </ol>
+     *
+     * <p>This approach eliminates TypeInformation extraction complexity for 
metadata inference,
+     * making it work with ANY serializer type (Avro, custom types, etc.).
+     *
+     * @param options Configuration containing table options
+     * @param classOption Config option for explicit class specification
+     * @param typeInfoFactoryOption Config option for type factory 
specification
+     * @param rowField The table field containing name and LogicalType
+     * @param inferStateType Whether to enable automatic type inference. If 
false, returns null when
+     *     no explicit configuration is provided.
+     * @param context The inference context determining what type aspect to 
extract.
+     * @return The resolved TypeSerializer, or null if inferStateType is false 
and no explicit
+     *     configuration is provided.
+     * @throws IllegalArgumentException If both class and factory options are 
specified
+     * @throws RuntimeException If serializer creation fails
+     */
+    public TypeSerializer<?> resolveSerializer(
+            Configuration options,
+            ConfigOption<String> classOption,
+            ConfigOption<String> typeInfoFactoryOption,
+            RowType.RowField rowField,
+            boolean inferStateType,
+            InferenceContext context) {
+        try {
+            // Priority 1: Explicit configuration (backward compatibility)
+            TypeInformation<?> explicitTypeInfo =
+                    getExplicitTypeInfo(options, classOption, 
typeInfoFactoryOption);
+            if (explicitTypeInfo != null) {
+                return explicitTypeInfo.createSerializer(serializerConfig);
+            }
+            if (!inferStateType) {
+                return null;
+            }
+
+            // Priority 2: Direct serializer extraction from metadata
+            Optional<TypeSerializer<?>> metadataSerializer =
+                    getSerializerFromMetadata(rowField, context);
+            if (metadataSerializer.isPresent()) {
+                LOG.info(
+                        "Using serializer directly from metadata for state 
'{}' with context {}: {}",
+                        rowField.getName(),
+                        context,
+                        metadataSerializer.get().getClass().getSimpleName());
+                return metadataSerializer.get();
+            }
+
+            // Priority 3: Fallback to LogicalType-based inference
+            TypeInformation<?> fallbackTypeInfo = 
inferTypeFromLogicalType(rowField, context);
+            return fallbackTypeInfo.createSerializer(serializerConfig);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to resolve serializer for field " + 
rowField.getName(), e);
+        }
+    }
+
+    /**
+     * Extracts explicit TypeInformation from user configuration (Priority 1).
+     *
+     * @param options Configuration containing table options
+     * @param classOption Config option for explicit class specification
+     * @param typeInfoFactoryOption Config option for type factory 
specification
+     * @return The explicit TypeInformation if specified, null otherwise
+     * @throws IllegalArgumentException If both class and factory options are 
specified
+     * @throws ReflectiveOperationException If type instantiation fails
+     */
+    private TypeInformation<?> getExplicitTypeInfo(
+            Configuration options,
+            ConfigOption<String> classOption,
+            ConfigOption<String> typeInfoFactoryOption)
+            throws ReflectiveOperationException {
+
+        Optional<String> clazz = options.getOptional(classOption);
+        Optional<String> typeInfoFactory = 
options.getOptional(typeInfoFactoryOption);
+
+        if (clazz.isPresent() && typeInfoFactory.isPresent()) {
+            throw new IllegalArgumentException(
+                    "Either "
+                            + classOption.key()
+                            + " or "
+                            + typeInfoFactoryOption.key()
+                            + " can be specified, not both.");
+        }
+
+        if (clazz.isPresent()) {
+            return TypeInformation.of(Class.forName(clazz.get()));
+        } else if (typeInfoFactory.isPresent()) {
+            SavepointTypeInformationFactory savepointTypeInformationFactory =
+                    (SavepointTypeInformationFactory)
+                            TypeUtils.getInstance(typeInfoFactory.get(), new 
Object[0]);
+            return savepointTypeInformationFactory.getTypeInformation();
+        }
+
+        return null;
+    }
+
+    /**
+     * Directly extracts TypeSerializer from preloaded metadata (Priority 2).
+     *
+     * <p>This method performs NO I/O and NO TypeInformation conversion. It 
directly extracts the
+     * serializer that was used to write the state data.
+     *
+     * @param rowField The row field to extract serializer for
+     * @param context The inference context determining what serializer to 
extract
+     * @return The serializer if found in metadata, empty otherwise
+     */
+    private Optional<TypeSerializer<?>> getSerializerFromMetadata(
+            RowType.RowField rowField, InferenceContext context) {
+        try {
+            // Get state name for this field (defaults to field name)
+            String stateName = rowField.getName();
+
+            // Look up from preloaded metadata (NO I/O)
+            StateMetaInfoSnapshot stateMetaInfo = 
preloadedStateMetadata.get(stateName);
+
+            if (stateMetaInfo == null) {
+                LOG.debug("State '{}' not found in preloaded metadata", 
stateName);
+                return Optional.empty();
+            }
+
+            // Extract appropriate serializer based on context
+            TypeSerializerSnapshot<?> serializerSnapshot = null;
+            switch (context) {
+                case KEY:
+                    serializerSnapshot =
+                            stateMetaInfo.getTypeSerializerSnapshot(
+                                    
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER);
+                    break;
+                case MAP_KEY:
+                    // For MAP_KEY, we need the key serializer from the value 
serializer
+                    // (which is MapSerializer)
+                    TypeSerializerSnapshot<?> valueSnapshot =
+                            stateMetaInfo.getTypeSerializerSnapshot(
+                                    
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER);
+                    if (valueSnapshot != null) {
+                        TypeSerializer<?> valueSerializer = 
valueSnapshot.restoreSerializer();
+                        if (valueSerializer instanceof MapSerializer) {
+                            serializerSnapshot =
+                                    ((MapSerializer<?, ?>) valueSerializer)
+                                            .getKeySerializer()
+                                            .snapshotConfiguration();
+                        }
+                    }
+                    break;
+                case VALUE:
+                    serializerSnapshot =
+                            stateMetaInfo.getTypeSerializerSnapshot(
+                                    
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER);
+                    break;
+            }
+
+            if (serializerSnapshot == null) {
+                LOG.debug(
+                        "No serializer snapshot found for state '{}' with 
context {}",
+                        stateName,
+                        context);
+                return Optional.empty();
+            }
+
+            // Restore serializer from snapshot
+            TypeSerializer<?> serializer = 
serializerSnapshot.restoreSerializer();
+
+            // For VALUE context with complex types, extract the appropriate 
sub-serializer
+            if (context == InferenceContext.VALUE) {
+                return extractValueSerializerForLogicalType(serializer, 
rowField.getType());
+            }
+
+            return Optional.of(serializer);
+
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to extract serializer from metadata for field 
'{}': {}",
+                    rowField.getName(),
+                    e.getMessage());
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Extracts the appropriate value serializer based on LogicalType for 
VALUE context.
+     *
+     * @param fullSerializer The complete serializer from metadata
+     * @param logicalType The LogicalType from the table schema
+     * @return The appropriate value serializer
+     */
+    private Optional<TypeSerializer<?>> extractValueSerializerForLogicalType(
+            TypeSerializer<?> fullSerializer, LogicalType logicalType) {
+
+        switch (logicalType.getTypeRoot()) {
+            case ARRAY:
+                // ARRAY logical type → LIST state → extract element serializer
+                if (fullSerializer
+                        instanceof 
org.apache.flink.api.common.typeutils.base.ListSerializer) {
+                    
org.apache.flink.api.common.typeutils.base.ListSerializer<?> listSerializer =
+                            
(org.apache.flink.api.common.typeutils.base.ListSerializer<?>)
+                                    fullSerializer;
+                    return Optional.of(listSerializer.getElementSerializer());
+                }
+                LOG.debug(
+                        "Expected ListSerializer for ARRAY logical type but 
got: {}",
+                        fullSerializer.getClass());
+                return Optional.empty();
+
+            case MAP:
+                // MAP logical type → MAP state → extract value serializer
+                if (fullSerializer instanceof MapSerializer) {
+                    return Optional.of(((MapSerializer<?, ?>) 
fullSerializer).getValueSerializer());
+                }
+                LOG.debug(
+                        "Expected MapSerializer for MAP logical type but got: 
{}",
+                        fullSerializer.getClass());
+                return Optional.empty();
+
+            default:
+                // Primitive logical type → VALUE state → use serializer as-is
+                return Optional.of(fullSerializer);
+        }
+    }
+
+    /**
+     * Fallback inference using LogicalType when metadata extraction fails.
+     *
+     * @param rowField The row field to infer type for
+     * @param context The inference context
+     * @return The inferred TypeInformation
+     */
+    private TypeInformation<?> inferTypeFromLogicalType(
+            RowType.RowField rowField, InferenceContext context) {
+
+        LogicalType logicalType = rowField.getType();
+        String columnName = rowField.getName();
+
+        try {
+            switch (context) {
+                case KEY:
+                    // Keys are always primitive
+                    return TypeInformation.of(getPrimitiveClass(logicalType, 
columnName));
+
+                case MAP_KEY:
+                    // Extract key type from MAP logical type
+                    if (logicalType instanceof MapType) {
+                        LogicalType keyType = ((MapType) 
logicalType).getKeyType();
+                        return TypeInformation.of(getPrimitiveClass(keyType, 
columnName));
+                    }
+                    throw new UnsupportedOperationException(
+                            "MAP_KEY context requires MAP logical type, but 
got: " + logicalType);
+
+                case VALUE:
+                    return inferValueTypeFromLogicalType(logicalType, 
columnName);
+
+                default:
+                    throw new UnsupportedOperationException("Unknown context: 
" + context);
+            }
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Failed to infer type for context " + 
context, e);
+        }
+    }
+
+    /**
+     * Infers value type from LogicalType for VALUE context fallback.
+     *
+     * @param logicalType The LogicalType
+     * @param columnName The column name for error messages
+     * @return The inferred TypeInformation
+     */
+    private TypeInformation<?> inferValueTypeFromLogicalType(
+            LogicalType logicalType, String columnName) throws 
ClassNotFoundException {
+
+        switch (logicalType.getTypeRoot()) {
+            case ARRAY:
+                // ARRAY logical type → LIST state → return element type
+                ArrayType arrayType = (ArrayType) logicalType;
+                return TypeInformation.of(
+                        getPrimitiveClass(arrayType.getElementType(), 
columnName));
+
+            case MAP:
+                // MAP logical type → MAP state → return value type
+                MapType mapType = (MapType) logicalType;
+                return 
TypeInformation.of(getPrimitiveClass(mapType.getValueType(), columnName));
+
+            default:
+                // Primitive logical type → VALUE state → return primitive type
+                return TypeInformation.of(getPrimitiveClass(logicalType, 
columnName));
+        }
+    }
+
+    /**
+     * Maps LogicalType to primitive Java class.
+     *
+     * @param logicalType The LogicalType to map
+     * @param columnName The column name for error messages
+     * @return The corresponding Java class
+     */
+    private Class<?> getPrimitiveClass(LogicalType logicalType, String 
columnName)
+            throws ClassNotFoundException {
+        String className = inferTypeInfoClassFromLogicalType(columnName, 
logicalType);
+        return Class.forName(className);
+    }
+
+    private String inferTypeInfoClassFromLogicalType(String columnName, 
LogicalType logicalType) {
+        switch (logicalType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return String.class.getName();
+
+            case BOOLEAN:
+                return Boolean.class.getName();
+
+            case BINARY:
+            case VARBINARY:
+                return byte[].class.getName();
+
+            case DECIMAL:
+                return BigDecimal.class.getName();
+
+            case TINYINT:
+                return Byte.class.getName();
+
+            case SMALLINT:
+                return Short.class.getName();
+
+            case INTEGER:
+                return Integer.class.getName();
+
+            case BIGINT:
+                return Long.class.getName();
+
+            case FLOAT:
+                return Float.class.getName();
+
+            case DOUBLE:
+                return Double.class.getName();
+
+            case DATE:
+                return Integer.class.getName();
+
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return Long.class.getName();
+
+            case ARRAY:
+                return inferTypeInfoClassFromLogicalType(
+                        columnName, ((ArrayType) 
logicalType).getElementType());
+
+            case MAP:
+                return inferTypeInfoClassFromLogicalType(
+                        columnName, ((MapType) logicalType).getValueType());
+
+            case NULL:
+                return null;
+
+            case ROW:
+            case MULTISET:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case DISTINCT_TYPE:
+            case STRUCTURED_TYPE:
+            case RAW:
+            case SYMBOL:
+            case UNRESOLVED:
+            case DESCRIPTOR:
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unable to infer state format for SQL type: %s 
in column: %s. "
+                                        + "Please override the type with the 
following config parameter: %s.%s.%s",
+                                logicalType, columnName, FIELDS, columnName, 
VALUE_CLASS));
+        }
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java
index fa622bb0a10..865077717fc 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.flink.state.table;
 
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import javax.annotation.Nullable;
 
@@ -31,21 +31,21 @@ public class StateValueColumnConfiguration implements 
Serializable {
     private final int columnIndex;
     private final String stateName;
     private final SavepointConnectorOptions.StateType stateType;
-    @Nullable private final TypeInformation mapKeyTypeInfo;
-    @Nullable private final TypeInformation valueTypeInfo;
+    @Nullable private final TypeSerializer mapKeyTypeSerializer;
+    @Nullable private final TypeSerializer valueTypeSerializer;
     @Nullable private StateDescriptor stateDescriptor;
 
     public StateValueColumnConfiguration(
             int columnIndex,
             final String stateName,
             final SavepointConnectorOptions.StateType stateType,
-            @Nullable final TypeInformation mapKeyTypeInfo,
-            final TypeInformation valueTypeInfo) {
+            @Nullable final TypeSerializer mapKeyTypeSerializer,
+            final TypeSerializer valueTypeSerializer) {
         this.columnIndex = columnIndex;
         this.stateName = stateName;
         this.stateType = stateType;
-        this.mapKeyTypeInfo = mapKeyTypeInfo;
-        this.valueTypeInfo = valueTypeInfo;
+        this.mapKeyTypeSerializer = mapKeyTypeSerializer;
+        this.valueTypeSerializer = valueTypeSerializer;
     }
 
     public int getColumnIndex() {
@@ -61,12 +61,12 @@ public class StateValueColumnConfiguration implements 
Serializable {
     }
 
     @Nullable
-    public TypeInformation getMapKeyTypeInfo() {
-        return mapKeyTypeInfo;
+    public TypeSerializer getMapKeyTypeSerializer() {
+        return mapKeyTypeSerializer;
     }
 
-    public TypeInformation getValueTypeInfo() {
-        return valueTypeInfo;
+    public TypeSerializer getValueTypeSerializer() {
+        return valueTypeSerializer;
     }
 
     public void setStateDescriptor(StateDescriptor stateDescriptor) {
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java
deleted file mode 100644
index b73e3273e1f..00000000000
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.table;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
-
-import com.example.state.writer.job.schema.avro.AvroRecord;
-
-/** {@link SavepointTypeInformationFactory} for generic avro record. */
-public class GenericAvroSavepointTypeInformationFactory implements 
SavepointTypeInformationFactory {
-    @Override
-    public TypeInformation<?> getTypeInformation() {
-        return new GenericRecordAvroTypeInfo(AvroRecord.getClassSchema());
-    }
-}
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
index 9afeb85a660..5ddcc9fab64 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java
@@ -38,7 +38,7 @@ import static 
org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the savepoint SQL reader. */
-public class SavepointDynamicTableSourceTest {
+class SavepointDynamicTableSourceTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testReadKeyedState() throws Exception {
@@ -53,20 +53,19 @@ public class SavepointDynamicTableSourceTest {
                         + "  KeyedPrimitiveValue bigint,\n"
                         + "  KeyedPojoValue ROW<privateLong bigint, publicLong 
bigint>,\n"
                         + "  KeyedPrimitiveValueList ARRAY<bigint>,\n"
-                        + "  KeyedPrimitiveValueMap MAP<bigint, bigint>,\n"
+                        + "  KeyedPrimitiveValueMap MAP<string, bigint>,\n"
                         + "  PRIMARY KEY (k) NOT ENFORCED\n"
                         + ")\n"
                         + "with (\n"
                         + "  'connector' = 'savepoint',\n"
                         + "  'state.path' = 
'src/test/resources/table-state',\n"
-                        + "  'operator.uid' = 'keyed-state-process-uid',\n"
-                        + "  'fields.KeyedPojoValue.value-class' = 
'com.example.state.writer.job.schema.PojoData'\n"
+                        + "  'operator.uid' = 'keyed-state-process-uid'\n"
                         + ")";
         tEnv.executeSql(sql);
         Table table = tEnv.sqlQuery("SELECT * FROM state_table");
         List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
 
-        assertThat(result.size()).isEqualTo(10);
+        assertThat(result).hasSize(10);
 
         // Check key
         List<Long> keys =
@@ -79,8 +78,7 @@ public class SavepointDynamicTableSourceTest {
                 result.stream()
                         .map(r -> (Long) r.getField("KeyedPrimitiveValue"))
                         .collect(Collectors.toSet());
-        assertThat(primitiveValues.size()).isEqualTo(1);
-        assertThat(primitiveValues.iterator().next()).isEqualTo(1L);
+        assertThat(primitiveValues).containsExactly(1L);
 
         // Check pojo value state
         Set<Row> pojoValues =
@@ -108,26 +106,27 @@ public class SavepointDynamicTableSourceTest {
         }
 
         // Check map state
-        Set<Tuple2<Long, Map<Long, Long>>> mapValues =
+        Set<Tuple2<Long, Map<String, Long>>> mapValues =
                 result.stream()
                         .map(
                                 r ->
                                         Tuple2.of(
                                                 (Long) r.getField("k"),
-                                                (Map<Long, Long>)
+                                                (Map<String, Long>)
                                                         
r.getField("KeyedPrimitiveValueMap")))
                         .flatMap(l -> Set.of(l).stream())
                         .collect(Collectors.toSet());
         assertThat(mapValues.size()).isEqualTo(10);
-        for (Tuple2<Long, Map<Long, Long>> tuple2 : mapValues) {
+        for (Tuple2<Long, Map<String, Long>> tuple2 : mapValues) {
             assertThat(tuple2.f1.size()).isEqualTo(1);
-            assertThat(tuple2.f0).isEqualTo(tuple2.f1.get(tuple2.f0));
+            String expectedKey = String.valueOf(tuple2.f0);
+            assertThat(tuple2.f1.get(expectedKey)).isEqualTo(tuple2.f0);
         }
     }
 
     @Test
     @SuppressWarnings("DataFlowIssue")
-    public void testReadKeyedStateWithNullValues() throws Exception {
+    void testReadKeyedStateWithNullValues() throws Exception {
         Configuration config = new Configuration();
         config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
@@ -142,8 +141,7 @@ public class SavepointDynamicTableSourceTest {
                         + "with (\n"
                         + "  'connector' = 'savepoint',\n"
                         + "  'state.path' = 
'src/test/resources/table-state-nulls',\n"
-                        + "  'operator.uid' = 
'keyed-state-process-uid-null',\n"
-                        + "  'fields.total.value-class' = 
'com.example.state.writer.job.schema.PojoData'\n"
+                        + "  'operator.uid' = 'keyed-state-process-uid-null'\n"
                         + ")";
         tEnv.executeSql(sql);
         Table table = tEnv.sqlQuery("SELECT * FROM state_table");
@@ -169,7 +167,7 @@ public class SavepointDynamicTableSourceTest {
     }
 
     @Test
-    public void testReadAvroKeyedState() throws Exception {
+    void testReadAvroKeyedState() throws Exception {
         Configuration config = new Configuration();
         config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
@@ -185,14 +183,12 @@ public class SavepointDynamicTableSourceTest {
                         + "with (\n"
                         + "  'connector' = 'savepoint',\n"
                         + "  'state.path' = 
'src/test/resources/table-state-avro',\n"
-                        + "  'operator.uid' = 'keyed-state-process-uid',\n"
-                        + "  
'fields.KeyedSpecificAvroValue.value-type-factory' = 
'org.apache.flink.state.table.SpecificAvroSavepointTypeInformationFactory',\n"
-                        + "  'fields.KeyedGenericAvroValue.value-type-factory' 
= 'org.apache.flink.state.table.GenericAvroSavepointTypeInformationFactory'\n"
+                        + "  'operator.uid' = 'keyed-state-process-uid'\n"
                         + ")";
         tEnv.executeSql(sql);
         Table table = tEnv.sqlQuery("SELECT * FROM state_table");
         List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
-        assertThat(result.size()).isEqualTo(10);
+        assertThat(result).hasSize(10);
 
         // Check key
         List<Long> keys =
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
index 128166d8f57..b81033d573e 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java
@@ -35,9 +35,9 @@ import static 
org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the savepoint metadata SQL reader. */
-public class SavepointMetadataDynamicTableSourceTest {
+class SavepointMetadataDynamicTableSourceTest {
     @Test
-    public void testReadMetadata() throws Exception {
+    void testReadMetadata() throws Exception {
         Configuration config = new Configuration();
         config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
@@ -53,9 +53,9 @@ public class SavepointMetadataDynamicTableSourceTest {
         Iterator<Row> it = result.iterator();
         assertThat(it.next().toString())
                 .isEqualTo(
-                        "+I[2, Source: broadcast-source, broadcast-source-uid, 
3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]");
+                        "+I[10, Source: broadcast-source, 
broadcast-source-uid, 3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]");
         assertThat(it.next().toString())
                 .isEqualTo(
-                        "+I[2, keyed-broadcast-process, 
keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 
4548]");
+                        "+I[10, keyed-broadcast-process, 
keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 
4548]");
     }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java
new file mode 100644
index 00000000000..b8f456a9bc0
--- /dev/null
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the SavepointTypeInformationFactory. */
+class SavepointTypeInformationFactoryTest {
+
+    public static class TestLongTypeInformationFactory implements 
SavepointTypeInformationFactory {
+        private static volatile boolean wasCalled = false;
+
+        public static boolean wasFactoryCalled() {
+            return wasCalled;
+        }
+
+        public static void resetCallTracker() {
+            wasCalled = false;
+        }
+
+        @Override
+        public TypeInformation<?> getTypeInformation() {
+            wasCalled = true;
+            return TypeInformation.of(Long.class);
+        }
+    }
+
+    private static class TestStringTypeInformationFactory
+            implements SavepointTypeInformationFactory {
+        @Override
+        public TypeInformation<?> getTypeInformation() {
+            return TypeInformation.of(String.class);
+        }
+    }
+
+    @Test
+    void testSavepointTypeInformationFactoryEndToEnd() throws Exception {
+        TestLongTypeInformationFactory.resetCallTracker();
+
+        Configuration config = new Configuration();
+        config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        final String sql =
+                "CREATE TABLE state_table (\n"
+                        + "  k bigint,\n"
+                        + "  KeyedPrimitiveValue bigint,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ")\n"
+                        + "with (\n"
+                        + "  'connector' = 'savepoint',\n"
+                        + "  'state.path' = 
'src/test/resources/table-state',\n"
+                        + "  'operator.uid' = 'keyed-state-process-uid',\n"
+                        + "  'fields.KeyedPrimitiveValue.value-type-factory' = 
'"
+                        + TestLongTypeInformationFactory.class.getName()
+                        + "'\n"
+                        + ")";
+
+        tEnv.executeSql(sql);
+        Table table = tEnv.sqlQuery("SELECT k, KeyedPrimitiveValue FROM 
state_table");
+        List<Row> result = tEnv.toDataStream(table).executeAndCollect(100);
+
+        assertThat(TestLongTypeInformationFactory.wasFactoryCalled())
+                .as(
+                        "Factory getTypeInformation() method must be called - 
this proves factory is used instead of metadata inference")
+                .isTrue();
+
+        assertThat(result).hasSize(10);
+
+        Set<Long> keys =
+                result.stream().map(r -> (Long) 
r.getField("k")).collect(Collectors.toSet());
+        assertThat(keys).hasSize(10);
+        assertThat(keys).containsExactlyInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 
7L, 8L, 9L);
+
+        Set<Long> primitiveValues =
+                result.stream()
+                        .map(r -> (Long) r.getField("KeyedPrimitiveValue"))
+                        .collect(Collectors.toSet());
+        assertThat(primitiveValues).containsExactly(1L);
+    }
+
+    @Test
+    void testBasicFactoryFunctionality() {
+        TestLongTypeInformationFactory.resetCallTracker();
+
+        TestLongTypeInformationFactory longFactory = new 
TestLongTypeInformationFactory();
+        TypeInformation<?> longTypeInfo = longFactory.getTypeInformation();
+
+        assertThat(longTypeInfo).isEqualTo(TypeInformation.of(Long.class));
+        assertThat(TestLongTypeInformationFactory.wasFactoryCalled()).isTrue();
+
+        TestStringTypeInformationFactory stringFactory = new 
TestStringTypeInformationFactory();
+        TypeInformation<?> stringTypeInfo = stringFactory.getTypeInformation();
+
+        assertThat(stringTypeInfo).isEqualTo(TypeInformation.of(String.class));
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java
deleted file mode 100644
index 8e9e459aff0..00000000000
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.state.table;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
-
-import com.example.state.writer.job.schema.avro.AvroRecord;
-
-/** {@link SavepointTypeInformationFactory} for specific avro record. */
-public class SpecificAvroSavepointTypeInformationFactory
-        implements SavepointTypeInformationFactory {
-    @Override
-    public TypeInformation<?> getTypeInformation() {
-        return new AvroTypeInfo<>(AvroRecord.class);
-    }
-}
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata
 
b/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata
index dc9d5acbbd2..1bff4e03d5f 100644
Binary files 
a/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata
 and 
b/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata
 differ

Reply via email to