This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 0fa122007be1aacd0e9af0bc505761d8e8473555
Author: wunan1210 <[email protected]>
AuthorDate: Sun Aug 22 22:03:32 2021 +0800

    [FlinkConnector] Make flink datastream source parameterized (#6473)
    
    make flink datastream source parameterized as List<?> instead of Object.
---
 .../doris/flink/datastream/DorisSourceFunction.java      | 16 ++++++++--------
 .../deserialization/SimpleListDeserializationSchema.java |  8 +++++---
 .../apache/doris/flink/datastream/ScalaValueReader.scala |  2 +-
 3 files changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java 
b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 82ab224..08ec5b0 100644
--- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -36,17 +36,17 @@ import java.util.List;
  * DorisSource
  **/
 
-public class DorisSourceFunction<T> extends RichSourceFunction<T> implements 
ResultTypeQueryable<T> {
+public class DorisSourceFunction extends RichSourceFunction<List<?>> 
implements ResultTypeQueryable<List<?>> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DorisSourceFunction.class);
 
-    private DorisDeserializationSchema deserializer;
-    private DorisOptions options;
-    private DorisReadOptions readOptions;
+    private final DorisDeserializationSchema<List<?>> deserializer;
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
     private List<PartitionDefinition> dorisPartitions;
     private ScalaValueReader scalaValueReader;
 
-    public DorisSourceFunction(DorisStreamOptions streamOptions, 
DorisDeserializationSchema deserializer) {
+    public DorisSourceFunction(DorisStreamOptions streamOptions, 
DorisDeserializationSchema<List<?>> deserializer) {
         this.deserializer = deserializer;
         this.options = streamOptions.getOptions();
         this.readOptions = streamOptions.getReadOptions();
@@ -59,11 +59,11 @@ public class DorisSourceFunction<T> extends 
RichSourceFunction<T> implements Res
     }
 
     @Override
-    public void run(SourceContext sourceContext) throws Exception {
+    public void run(SourceContext<List<?>> sourceContext) {
         for (PartitionDefinition partitions : dorisPartitions) {
             scalaValueReader = new ScalaValueReader(partitions, options, 
readOptions);
             while (scalaValueReader.hasNext()) {
-                Object next = scalaValueReader.next();
+                List<?> next = scalaValueReader.next();
                 sourceContext.collect(next);
             }
         }
@@ -76,7 +76,7 @@ public class DorisSourceFunction<T> extends 
RichSourceFunction<T> implements Res
 
 
     @Override
-    public TypeInformation<T> getProducedType() {
+    public TypeInformation<List<?>> getProducedType() {
         return this.deserializer.getProducedType();
     }
 }
diff --git 
a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
 
b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index 7fcf2f6..d9ec6e5 100644
--- 
a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ 
b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -17,15 +17,17 @@
 package org.apache.doris.flink.deserialization;
 
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.util.List;
 
 
-public class SimpleListDeserializationSchema implements 
DorisDeserializationSchema {
+public class SimpleListDeserializationSchema implements 
DorisDeserializationSchema<List<?>> {
 
     @Override
-    public TypeInformation getProducedType() {
-        return TypeInformation.of(List.class);
+    public TypeInformation<List<?>> getProducedType() {
+        return TypeInformation.of(new TypeHint<List<?>>() {
+        });
     }
 }
diff --git 
a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index bdf9487..e69a86f 100644
--- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -206,7 +206,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
options: DorisOptions, re
    * get next value.
    * @return next value
    */
-  def next: AnyRef = {
+  def next: java.util.List[_] = {
     if (!hasNext) {
       logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
       throw new ShouldNeverHappenException

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to