This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new ffb2d86e [api-draft][flink] non-key operator can't get the keyed state 
store (#1961)
ffb2d86e is described below

commit ffb2d86eea7ecdda029657bb349628b9fba09793
Author: Zongwen Li <[email protected]>
AuthorDate: Fri May 27 12:32:13 2022 +0800

    [api-draft][flink] non-key operator can't get the keyed state store (#1961)
---
 .../flink/source/BaseSeaTunnelSourceFunction.java  | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 1778596f..57b6e38b 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -24,8 +24,8 @@ import 
org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 import org.apache.seatunnel.translation.source.BaseSourceFunction;
 
 import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected transient volatile BaseSourceFunction<SeaTunnelRow> 
internalSource;
 
-    protected transient MapState<Integer, List<byte[]>> sourceState;
+    protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
     protected transient volatile Map<Integer, List<byte[]>> restoredState = 
new HashMap<>();
 
     /**
@@ -114,20 +114,23 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
             LOG.debug("snapshotState() called on closed source");
         } else {
             sourceState.clear();
-            
sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+            
sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
         }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext 
initializeContext) throws Exception {
-        this.sourceState = initializeContext.getKeyedStateStore()
-            .getMapState(new MapStateDescriptor<>(
-                getStateName(),
-                BasicTypeInfo.INT_TYPE_INFO,
-                
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
+        this.sourceState = initializeContext.getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    getStateName(),
+                    Types.MAP(
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO))
+                ));
         if (initializeContext.isRestored()) {
             // populate actual holder for restored state
-            sourceState.entries().forEach(entry -> 
restoredState.put(entry.getKey(), entry.getValue()));
+            sourceState.get().forEach(map -> restoredState.putAll(map));
             LOG.info("Consumer subtask {} restored state", 
getRuntimeContext().getIndexOfThisSubtask());
         } else {
             LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask());

Reply via email to