This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 82a19ab [Bug] Fix the state compatibility problem (#257)
82a19ab is described below
commit 82a19ab23211679ef44922f2c913c32d9c3333c1
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Wed Dec 6 11:49:34 2023 +0800
[Bug] Fix the state compatibility problem (#257)
---
.../src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index d5e5bc1..0033ad0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -130,6 +130,10 @@ public class DorisWriter<IN> implements
StatefulSink.StatefulSinkWriter<IN, Dori
if(!state.getLabelPrefix().equals(labelPrefix)){
LOG.warn("Label prefix from previous execution {} has changed
to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix());
}
+ if (state.getDatabase() == null || state.getTable() == null) {
+ LOG.warn("Transactions cannot be aborted when restore because
the last used flink-doris-connector version less than 1.5.0.");
+ continue;
+ }
String key = state.getDatabase() + "." + state.getTable();
DorisStreamLoad streamLoader = getStreamLoader(key);
streamLoader.abortPreCommit(state.getLabelPrefix(),
curCheckpointId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]