gaborgsomogyi commented on code in PR #27340:
URL: https://github.com/apache/flink/pull/27340#discussion_r2840449778


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java:
##########
@@ -324,82 +281,25 @@ private SavepointConnectorOptions.StateType 
inferStateType(LogicalType logicalTy
         }
     }
 
-    @Nullable
-    private String inferStateMapKeyFormat(String columnName, LogicalType 
logicalType) {
-        return logicalType.is(LogicalTypeRoot.MAP)
-                ? inferStateValueFormat(columnName, ((MapType) 
logicalType).getKeyType())
-                : null;
-    }
-
-    private String inferStateValueFormat(String columnName, LogicalType 
logicalType) {
-        switch (logicalType.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                return String.class.getName();
-
-            case BOOLEAN:
-                return Boolean.class.getName();
-
-            case BINARY:
-            case VARBINARY:
-                return byte[].class.getName();
-
-            case DECIMAL:
-                return BigDecimal.class.getName();
-
-            case TINYINT:
-                return Byte.class.getName();
-
-            case SMALLINT:
-                return Short.class.getName();
-
-            case INTEGER:
-                return Integer.class.getName();
-
-            case BIGINT:
-                return Long.class.getName();
-
-            case FLOAT:
-                return Float.class.getName();
-
-            case DOUBLE:
-                return Double.class.getName();
-
-            case DATE:
-                return Integer.class.getName();
-
-            case INTERVAL_YEAR_MONTH:
-            case INTERVAL_DAY_TIME:
-                return Long.class.getName();
-
-            case ARRAY:
-                return inferStateValueFormat(
-                        columnName, ((ArrayType) 
logicalType).getElementType());
-
-            case MAP:
-                return inferStateValueFormat(columnName, ((MapType) 
logicalType).getValueType());
-
-            case NULL:
-                return null;
-
-            case ROW:
-            case MULTISET:
-            case TIME_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-            case DISTINCT_TYPE:
-            case STRUCTURED_TYPE:
-            case RAW:
-            case SYMBOL:
-            case UNRESOLVED:
-            case DESCRIPTOR:
-            default:
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Unable to infer state format for SQL type: %s 
in column: %s. "
-                                        + "Please override the type with the 
following config parameter: %s.%s.%s",
-                                logicalType, columnName, FIELDS, columnName, 
VALUE_CLASS));
+    /**
+     * Preloads all state metadata for an operator in a single I/O operation.
+     *
+     * @param savepointPath Path to the savepoint
+     * @param operatorIdentifier Operator UID or hash
+     * @return Map from state name to StateMetaInfoSnapshot
+     */
+    private Map<String, StateMetaInfoSnapshot> preloadStateMetadata(
+            String savepointPath, OperatorIdentifier operatorIdentifier) {
+        try {
+            return SavepointLoader.loadOperatorStateMetadata(savepointPath, 
operatorIdentifier);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to load state metadata from savepoint '%s' 
for operator '%s'. "
+                                    + "Ensure the savepoint path is valid and 
the operator exists in the savepoint. "
+                                    + "Original error: %s",

Review Comment:
   Removed.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java:
##########
@@ -234,6 +154,83 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                 rowType);
     }
 
+    private StateValueColumnConfiguration createStateColumnConfiguration(
+            int columnIndex,
+            RowType rowType,
+            Configuration options,
+            Set<ConfigOption<?>> optionalOptions,
+            SavepointTypeInfoResolver typeResolver) {
+
+        RowType.RowField valueRowField = rowType.getFields().get(columnIndex);
+
+        ConfigOption<String> stateNameOption =
+                key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), 
STATE_NAME))

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to