This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 0fa122007be1aacd0e9af0bc505761d8e8473555 Author: wunan1210 <[email protected]> AuthorDate: Sun Aug 22 22:03:32 2021 +0800 [FlinkConnector] Make flink datastream source parameterized (#6473) make flink datastream source parameterized as List<?> instead of Object. --- .../doris/flink/datastream/DorisSourceFunction.java | 16 ++++++++-------- .../deserialization/SimpleListDeserializationSchema.java | 8 +++++--- .../apache/doris/flink/datastream/ScalaValueReader.scala | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 82ab224..08ec5b0 100644 --- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -36,17 +36,17 @@ import java.util.List; * DorisSource **/ -public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> { +public class DorisSourceFunction extends RichSourceFunction<List<?>> implements ResultTypeQueryable<List<?>> { private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class); - private DorisDeserializationSchema deserializer; - private DorisOptions options; - private DorisReadOptions readOptions; + private final DorisDeserializationSchema<List<?>> deserializer; + private final DorisOptions options; + private final DorisReadOptions readOptions; private List<PartitionDefinition> dorisPartitions; private ScalaValueReader scalaValueReader; - public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) { + public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) { this.deserializer = deserializer; this.options = streamOptions.getOptions(); this.readOptions = streamOptions.getReadOptions(); @@ -59,11 +59,11 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res } @Override - public void run(SourceContext sourceContext) throws Exception { + public void run(SourceContext<List<?>> sourceContext) { for (PartitionDefinition partitions : dorisPartitions) { scalaValueReader = new ScalaValueReader(partitions, options, readOptions); while (scalaValueReader.hasNext()) { - Object next = scalaValueReader.next(); + List<?> next = scalaValueReader.next(); sourceContext.collect(next); } } @@ -76,7 +76,7 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res @Override - public TypeInformation<T> getProducedType() { + public TypeInformation<List<?>> getProducedType() { return this.deserializer.getProducedType(); } } diff --git a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java index 7fcf2f6..d9ec6e5 100644 --- a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java +++ b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java @@ -17,15 +17,17 @@ package org.apache.doris.flink.deserialization; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.util.List; -public class SimpleListDeserializationSchema implements DorisDeserializationSchema { +public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> { @Override - public TypeInformation getProducedType() { - return TypeInformation.of(List.class); + public TypeInformation<List<?>> getProducedType() { + return TypeInformation.of(new TypeHint<List<?>>() { + }); } } diff --git a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala index bdf9487..e69a86f 100644 --- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala @@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re * get next value. * @return next value */ - def next: AnyRef = { + def next: java.util.List[_] = { if (!hasNext) { logger.error(SHOULD_NOT_HAPPEN_MESSAGE) throw new ShouldNeverHappenException --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
