This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b023f91 [fix] When doing checkpoint, write cache data to doris to
prevent loss (#4)
b023f91 is described below
commit b023f91e28befbc188ad70e18aaf2e58344e11bf
Author: liuyaolin <[email protected]>
AuthorDate: Tue Feb 22 19:19:32 2022 +0800
[fix] When doing checkpoint, write cache data to doris to prevent loss (#4)
---
.../java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java | 4 ++--
.../main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java | 4 +++-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
index 6be6aa4..8615507 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
@@ -53,12 +53,12 @@ public class GenericDorisSinkFunction<T> extends
RichSinkFunction<T>
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
-
+
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
-
+ outputFormat.flush();
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index cccdb45..70daf72 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.types.RowKind;
+import org.apache.doris.flink.cfg.GenericDorisSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
/**
* DorisDynamicTableSink
@@ -65,7 +67,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setExecutionOptions(executionOptions)
.setFieldDataTypes(tableSchema.getFieldDataTypes())
.setFieldNames(tableSchema.getFieldNames());
- return OutputFormatProvider.of(builder.build());
+ return SinkFunctionProvider.of(new
GenericDorisSinkFunction(builder.build()));
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]