This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b023f91  [fix] When doing checkpoint, write cache data to doris to 
prevent loss (#4)
b023f91 is described below

commit b023f91e28befbc188ad70e18aaf2e58344e11bf
Author: liuyaolin <[email protected]>
AuthorDate: Tue Feb 22 19:19:32 2022 +0800

    [fix] When doing checkpoint, write cache data to doris to prevent loss (#4)
---
 .../java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java     | 4 ++--
 .../main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java | 4 +++-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
index 6be6aa4..8615507 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
@@ -53,12 +53,12 @@ public class GenericDorisSinkFunction<T> extends 
RichSinkFunction<T>
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws 
Exception {
-
+       
     }
 
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-
+          outputFormat.flush();
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index cccdb45..70daf72 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.types.RowKind;
+import org.apache.doris.flink.cfg.GenericDorisSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 
 /**
  * DorisDynamicTableSink
@@ -65,7 +67,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setExecutionOptions(executionOptions)
                 .setFieldDataTypes(tableSchema.getFieldDataTypes())
                 .setFieldNames(tableSchema.getFieldNames());
-        return OutputFormatProvider.of(builder.build());
+        return SinkFunctionProvider.of(new 
GenericDorisSinkFunction(builder.build()));
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to