This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3e0700f [Fix] Fix processedOffset update when retry load (#79)
3e0700f is described below
commit 3e0700f1ae27854380cc763c869d496ef71c7a7e
Author: wudi <[email protected]>
AuthorDate: Wed Jul 2 19:37:58 2025 +0800
[Fix] Fix processedOffset update when retry load (#79)
---
.../doris/kafka/connector/writer/DorisWriter.java | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
index 6e1a5a4..cfa3ef9 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java
@@ -46,6 +46,8 @@ public abstract class DorisWriter {
protected final AtomicLong committedOffset; // loaded offset + 1
protected final AtomicLong flushedOffset; // flushed offset
protected final AtomicLong processedOffset; // processed offset
+ protected final AtomicLong skipMinOffset; // skipMinOffset offset
+ protected final AtomicLong skipMaxOffset; // skipMaxOffset offset
protected long previousFlushTimeStamp;
// make the initialization lazy
@@ -86,6 +88,8 @@ public abstract class DorisWriter {
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
this.committedOffset = new AtomicLong(0);
+ this.skipMinOffset = new AtomicLong(-1);
+ this.skipMaxOffset = new AtomicLong(-1);
this.previousFlushTimeStamp = System.currentTimeMillis();
this.dorisOptions = dorisOptions;
@@ -118,6 +122,19 @@ public abstract class DorisWriter {
// discard the record if the record offset is smaller or equal to
server side offset
if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
&& record.kafkaOffset() > processedOffset.get()) {
+
+ if (skipMinOffset.get() != -1 || skipMaxOffset.get() != -1) {
+ LOG.info(
+ "Skip partition {} offset {} to {}, cause these are
smaller than processedOffset, offsetPersistedInDoris={}, processedOffset={}",
+ partition,
+ skipMinOffset.get(),
+ skipMaxOffset.get(),
+ offsetPersistedInDoris.get(),
+ processedOffset.get());
+ skipMinOffset.set(-1);
+ skipMaxOffset.set(-1);
+ }
+
SinkRecord dorisRecord = record;
RecordBuffer tmpBuff = null;
@@ -131,19 +148,19 @@ public abstract class DorisWriter {
if (tmpBuff != null) {
LOG.info(
- "trigger flush by buffer size or count, buffer size:
{}, num of records: {}",
+ "trigger flush by buffer size or count, partition: {},
buffer size: {}, num of records: {}",
+ partition,
tmpBuff.getBufferSizeBytes(),
tmpBuff.getNumOfRecords());
flush(tmpBuff);
+ processedOffset.set(dorisRecord.kafkaOffset());
}
- processedOffset.set(dorisRecord.kafkaOffset());
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "The record offset is smaller than processedOffset.
recordOffset={}, offsetPersistedInDoris={}, processedOffset={}",
- record.kafkaOffset(),
- offsetPersistedInDoris.get(),
- processedOffset.get());
+ if (skipMinOffset.get() == -1) {
+ skipMinOffset.set(record.kafkaOffset());
+ skipMaxOffset.set(record.kafkaOffset());
+ } else {
+ skipMaxOffset.set(record.kafkaOffset());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]