This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new dce9ec9c [Api-Draft] Fix Flink sink type convert error. (#1890)
dce9ec9c is described below

commit dce9ec9c39010c1ba0b71d6c10204f4df9f513ea
Author: TrickyZerg <[email protected]>
AuthorDate: Mon May 16 18:30:57 2022 +0800

    [Api-Draft] Fix Flink sink type convert error. (#1890)
    
    * fix Flink sink type convert error
---
 .../translation/flink/sink/FlinkSink.java          | 62 ++++++++++------------
 .../translation/flink/sink/FlinkSinkConverter.java |  7 ++-
 2 files changed, 30 insertions(+), 39 deletions(-)

diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 5fb88060..346eae70 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -30,11 +30,14 @@ import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements 
Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+@SuppressWarnings("unchecked")
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements 
Sink<InputT, Serializable, Serializable, Serializable> {
 
     private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
     private final Map<String, String> configuration;
@@ -46,66 +49,55 @@ public class FlinkSink<InputT, CommT, WriterStateT, 
GlobalCommT> implements Sink
     }
 
     @Override
-    public SinkWriter<InputT, CommT, WriterStateT> 
createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, 
List<WriterStateT> states) throws IOException {
+    public SinkWriter<InputT, Serializable, Serializable> 
createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, 
List<Serializable> states) throws IOException {
         // TODO add subtask and parallelism.
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
-            new DefaultSinkWriterContext(configuration, 0, 0);
+                new DefaultSinkWriterContext(configuration, 0, 0);
 
-        FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new 
FlinkSinkWriterConverter<>();
+        FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter 
= new FlinkSinkWriterConverter<>();
 
         if (states == null || states.isEmpty()) {
             return converter.convert(sink.createWriter(stContext));
         } else {
-            return converter.convert(sink.restoreWriter(stContext, states));
+            return converter.convert(sink.restoreWriter(stContext, 
states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
         }
     }
 
     @Override
-    public Optional<Committer<CommT>> createCommitter() throws IOException {
+    public Optional<Committer<Serializable>> createCommitter() throws 
IOException {
 
-        FlinkCommitterConverter<CommT> converter = new 
FlinkCommitterConverter<>();
+        FlinkCommitterConverter<Serializable> converter = new 
FlinkCommitterConverter<>();
         Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
-        return committer.map(converter::convert);
+        return committer.map(sinkCommitter -> 
converter.convert((SinkCommitter<Serializable>) sinkCommitter));
 
     }
 
     @Override
-    public Optional<GlobalCommitter<CommT, GlobalCommT>> 
createGlobalCommitter() throws IOException {
-        FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new 
FlinkGlobalCommitterConverter<>();
+    public Optional<GlobalCommitter<Serializable, Serializable>> 
createGlobalCommitter() throws IOException {
+        FlinkGlobalCommitterConverter<Serializable, Serializable> converter = 
new FlinkGlobalCommitterConverter<>();
         Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer = 
sink.createAggregatedCommitter();
-        return committer.map(converter::convert);
+        return committer.map(commTGlobalCommTSinkAggregatedCommitter -> 
converter.convert((SinkAggregatedCommitter<Serializable,
+                Serializable>) commTGlobalCommTSinkAggregatedCommitter));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<CommT>> 
getCommittableSerializer() {
-        if (sink.getCommitInfoSerializer().isPresent()) {
-            final FlinkSimpleVersionedSerializerConverter<CommT> converter = 
new FlinkSimpleVersionedSerializerConverter<>();
-            final Serializer<CommT> commTSerializer = 
sink.getCommitInfoSerializer().get();
-            return Optional.of(converter.convert(commTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> 
getCommittableSerializer() {
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter 
= new FlinkSimpleVersionedSerializerConverter<>();
+        final Optional<Serializer<CommT>> commTSerializer = 
sink.getCommitInfoSerializer();
+        return commTSerializer.map(serializer -> 
converter.convert((Serializer<Serializable>) serializer));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<GlobalCommT>> 
getGlobalCommittableSerializer() {
-        if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
-            final Serializer<GlobalCommT> globalCommTSerializer = 
sink.getAggregatedCommitInfoSerializer().get();
-            final FlinkSimpleVersionedSerializerConverter<GlobalCommT> 
converter = new FlinkSimpleVersionedSerializerConverter<>();
-            return Optional.of(converter.convert(globalCommTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> 
getGlobalCommittableSerializer() {
+        final Optional<Serializer<GlobalCommT>> globalCommTSerializer = 
sink.getAggregatedCommitInfoSerializer();
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter 
= new FlinkSimpleVersionedSerializerConverter<>();
+        return globalCommTSerializer.map(serializer -> 
converter.convert((Serializer<Serializable>) serializer));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<WriterStateT>> 
getWriterStateSerializer() {
-        if (sink.getWriterStateSerializer().isPresent()) {
-            final Serializer<WriterStateT> writerStateTSerializer = 
sink.getWriterStateSerializer().get();
-            final FlinkSimpleVersionedSerializerConverter<WriterStateT> 
converter = new FlinkSimpleVersionedSerializerConverter<>();
-            return Optional.of(converter.convert(writerStateTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> 
getWriterStateSerializer() {
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter 
= new FlinkSimpleVersionedSerializerConverter<>();
+        final Optional<Serializer<WriterStateT>> writerStateTSerializer = 
sink.getWriterStateSerializer();
+        return writerStateTSerializer.map(serializer -> 
converter.convert((Serializer<Serializable>) serializer));
     }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2456b9fe..61e7f67f 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.connector.sink.Sink;
 import java.util.Map;
 
 public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT, 
AggregatedCommitInfoT>
-    implements SinkConverter<
-    SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
-    Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+        implements SinkConverter<SeaTunnelSink<SeaTunnelRowT, StateT, 
CommitInfoT,
+        AggregatedCommitInfoT>, Sink<FlinkRowT, StateT, CommitInfoT, 
AggregatedCommitInfoT>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
-        SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+            SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink, Map<String, String> configuration) {
         return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) 
new FlinkSink<>(sink, configuration);
 
     }

Reply via email to