This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new dce9ec9c [Api-Draft] Fix Flink sink type convert error. (#1890)
dce9ec9c is described below
commit dce9ec9c39010c1ba0b71d6c10204f4df9f513ea
Author: TrickyZerg <[email protected]>
AuthorDate: Mon May 16 18:30:57 2022 +0800
[Api-Draft] Fix Flink sink type convert error. (#1890)
* fix Flink sink type convert error
---
.../translation/flink/sink/FlinkSink.java | 62 ++++++++++------------
.../translation/flink/sink/FlinkSinkConverter.java | 7 ++-
2 files changed, 30 insertions(+), 39 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 5fb88060..346eae70 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -30,11 +30,14 @@ import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements
Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+@SuppressWarnings("unchecked")
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements
Sink<InputT, Serializable, Serializable, Serializable> {
private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
private final Map<String, String> configuration;
@@ -46,66 +49,55 @@ public class FlinkSink<InputT, CommT, WriterStateT,
GlobalCommT> implements Sink
}
@Override
- public SinkWriter<InputT, CommT, WriterStateT>
createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context,
List<WriterStateT> states) throws IOException {
+ public SinkWriter<InputT, Serializable, Serializable>
createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context,
List<Serializable> states) throws IOException {
// TODO add subtask and parallelism.
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new DefaultSinkWriterContext(configuration, 0, 0);
+ new DefaultSinkWriterContext(configuration, 0, 0);
- FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new
FlinkSinkWriterConverter<>();
+ FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter
= new FlinkSinkWriterConverter<>();
if (states == null || states.isEmpty()) {
return converter.convert(sink.createWriter(stContext));
} else {
- return converter.convert(sink.restoreWriter(stContext, states));
+ return converter.convert(sink.restoreWriter(stContext,
states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
}
}
@Override
- public Optional<Committer<CommT>> createCommitter() throws IOException {
+ public Optional<Committer<Serializable>> createCommitter() throws
IOException {
- FlinkCommitterConverter<CommT> converter = new
FlinkCommitterConverter<>();
+ FlinkCommitterConverter<Serializable> converter = new
FlinkCommitterConverter<>();
Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
- return committer.map(converter::convert);
+ return committer.map(sinkCommitter ->
converter.convert((SinkCommitter<Serializable>) sinkCommitter));
}
@Override
- public Optional<GlobalCommitter<CommT, GlobalCommT>>
createGlobalCommitter() throws IOException {
- FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new
FlinkGlobalCommitterConverter<>();
+ public Optional<GlobalCommitter<Serializable, Serializable>>
createGlobalCommitter() throws IOException {
+ FlinkGlobalCommitterConverter<Serializable, Serializable> converter =
new FlinkGlobalCommitterConverter<>();
Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer =
sink.createAggregatedCommitter();
- return committer.map(converter::convert);
+ return committer.map(commTGlobalCommTSinkAggregatedCommitter ->
converter.convert((SinkAggregatedCommitter<Serializable,
+ Serializable>) commTGlobalCommTSinkAggregatedCommitter));
}
@Override
- public Optional<SimpleVersionedSerializer<CommT>>
getCommittableSerializer() {
- if (sink.getCommitInfoSerializer().isPresent()) {
- final FlinkSimpleVersionedSerializerConverter<CommT> converter =
new FlinkSimpleVersionedSerializerConverter<>();
- final Serializer<CommT> commTSerializer =
sink.getCommitInfoSerializer().get();
- return Optional.of(converter.convert(commTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>>
getCommittableSerializer() {
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter
= new FlinkSimpleVersionedSerializerConverter<>();
+ final Optional<Serializer<CommT>> commTSerializer =
sink.getCommitInfoSerializer();
+ return commTSerializer.map(serializer ->
converter.convert((Serializer<Serializable>) serializer));
}
@Override
- public Optional<SimpleVersionedSerializer<GlobalCommT>>
getGlobalCommittableSerializer() {
- if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
- final Serializer<GlobalCommT> globalCommTSerializer =
sink.getAggregatedCommitInfoSerializer().get();
- final FlinkSimpleVersionedSerializerConverter<GlobalCommT>
converter = new FlinkSimpleVersionedSerializerConverter<>();
- return Optional.of(converter.convert(globalCommTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>>
getGlobalCommittableSerializer() {
+ final Optional<Serializer<GlobalCommT>> globalCommTSerializer =
sink.getAggregatedCommitInfoSerializer();
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter
= new FlinkSimpleVersionedSerializerConverter<>();
+ return globalCommTSerializer.map(serializer ->
converter.convert((Serializer<Serializable>) serializer));
}
@Override
- public Optional<SimpleVersionedSerializer<WriterStateT>>
getWriterStateSerializer() {
- if (sink.getWriterStateSerializer().isPresent()) {
- final Serializer<WriterStateT> writerStateTSerializer =
sink.getWriterStateSerializer().get();
- final FlinkSimpleVersionedSerializerConverter<WriterStateT>
converter = new FlinkSimpleVersionedSerializerConverter<>();
- return Optional.of(converter.convert(writerStateTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>>
getWriterStateSerializer() {
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter
= new FlinkSimpleVersionedSerializerConverter<>();
+ final Optional<Serializer<WriterStateT>> writerStateTSerializer =
sink.getWriterStateSerializer();
+ return writerStateTSerializer.map(serializer ->
converter.convert((Serializer<Serializable>) serializer));
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2456b9fe..61e7f67f 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.connector.sink.Sink;
import java.util.Map;
public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT,
AggregatedCommitInfoT>
- implements SinkConverter<
- SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
- Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+ implements SinkConverter<SeaTunnelSink<SeaTunnelRowT, StateT,
CommitInfoT,
+ AggregatedCommitInfoT>, Sink<FlinkRowT, StateT, CommitInfoT,
AggregatedCommitInfoT>> {
@Override
@SuppressWarnings("unchecked")
public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
- SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT,
AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+ SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT,
AggregatedCommitInfoT> sink, Map<String, String> configuration) {
return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>)
new FlinkSink<>(sink, configuration);
}