This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new ffb2d86e [api-draft][flink] non-key operator can't get the keyed state
store (#1961)
ffb2d86e is described below
commit ffb2d86eea7ecdda029657bb349628b9fba09793
Author: Zongwen Li <[email protected]>
AuthorDate: Fri May 27 12:32:13 2022 +0800
[api-draft][flink] non-key operator can't get the keyed state store (#1961)
---
.../flink/source/BaseSeaTunnelSourceFunction.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 1778596f..57b6e38b 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -24,8 +24,8 @@ import
org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected transient volatile BaseSourceFunction<SeaTunnelRow>
internalSource;
- protected transient MapState<Integer, List<byte[]>> sourceState;
+ protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
protected transient volatile Map<Integer, List<byte[]>> restoredState =
new HashMap<>();
/**
@@ -114,20 +114,23 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
LOG.debug("snapshotState() called on closed source");
} else {
sourceState.clear();
-
sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+
sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
}
}
@Override
public void initializeState(FunctionInitializationContext
initializeContext) throws Exception {
- this.sourceState = initializeContext.getKeyedStateStore()
- .getMapState(new MapStateDescriptor<>(
- getStateName(),
- BasicTypeInfo.INT_TYPE_INFO,
-
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
+ this.sourceState = initializeContext.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ getStateName(),
+ Types.MAP(
+ BasicTypeInfo.INT_TYPE_INFO,
+
Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO))
+ ));
if (initializeContext.isRestored()) {
// populate actual holder for restored state
- sourceState.entries().forEach(entry ->
restoredState.put(entry.getKey(), entry.getValue()));
+ sourceState.get().forEach(map -> restoredState.putAll(map));
LOG.info("Consumer subtask {} restored state",
getRuntimeContext().getIndexOfThisSubtask());
} else {
LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask());