This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 182820f0235 branch-4.0: [enhance](job) add zero-row hint for Kafka
read_committed load and [opt](job) delay Kafka read committed zero-row retries
(#64584)
182820f0235 is described below
commit 182820f02351db2f5f1d0d8c5b1e7b7a5e012b71
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 18 23:40:39 2026 +0800
branch-4.0: [enhance](job) add zero-row hint for Kafka read_committed load
and [opt](job) delay Kafka read committed zero-row retries (#64584)
pick https://github.com/apache/doris/pull/63664; pick
https://github.com/apache/doris/pull/64046
---
.../load/routineload/KafkaRoutineLoadJob.java | 36 +++++-
.../doris/load/routineload/KafkaTaskInfo.java | 6 +
.../load/routineload/RoutineLoadTaskInfo.java | 11 +-
.../load/routineload/KafkaRoutineLoadJobTest.java | 76 +++++++++++
.../test_routine_load_error_info.groovy | 141 ++++++++++++++-------
5 files changed, 220 insertions(+), 50 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c338dbfeda4..67756bc2b43 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@@ -89,6 +90,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public static final String KAFKA_FILE_CATALOG = "kafka";
public static final String PROP_GROUP_ID = "group.id";
+ private static final String KAFKA_ISOLATION_LEVEL = "isolation.level";
+ private static final String KAFKA_READ_COMMITTED = "read_committed";
+ private static final String HAS_POSITIVE_LAG_DEBUG_POINT =
"KafkaRoutineLoadJob.hasPositiveLagForTask";
+ private static final String READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE =
"Kafka routine load consumed 0 rows "
+ + "while lag is still positive under
isolation.level=read_committed. If the upstream producer uses "
+ + "Kafka transactions, some records may be in uncommitted
transactions and are not visible yet.";
@SerializedName("bl")
private String brokerList;
@@ -357,6 +364,33 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
updateProgressAndOffsetsCache(attachment);
super.updateProgress(attachment);
+ updateReadCommittedLagHint(attachment);
+ }
+
+ private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment
attachment) {
+ if (shouldDelayScheduleForReadCommittedZeroRowsWithLag(attachment)) {
+ setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
+ }
+ }
+
+ boolean
shouldDelayScheduleForReadCommittedZeroRowsWithLag(RLTaskTxnCommitAttachment
attachment) {
+ return DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
+ || (attachment.getTotalRows() == 0 && isReadCommitted() &&
hasPositiveLagForTask(attachment));
+ }
+
+ private boolean isReadCommitted() {
+ return
KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
+ }
+
+ private boolean hasPositiveLagForTask(RLTaskTxnCommitAttachment
attachment) {
+ Map<Integer, Long> partitionIdToOffset = ((KafkaProgress)
attachment.getProgress()).getOffsetByPartition();
+ for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+ Long latestOffset =
cachedPartitionWithLatestOffsets.get(entry.getKey());
+ if (latestOffset != null && latestOffset > entry.getValue() + 1) {
+ return true;
+ }
+ }
+ return false;
}
@Override
@@ -377,7 +411,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// add new task
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
((KafkaProgress)
progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()),
isMultiTable());
- kafkaTaskInfo.setDelaySchedule(delaySchedule);
+ kafkaTaskInfo.setDelaySchedule(delaySchedule ||
oldKafkaTaskInfo.isDelaySchedule());
// remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
// add new task
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 5e5c7158b71..eec36a5af9b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -159,6 +159,12 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
}
+ @Override
+ protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
+ KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob)
routineLoadManager.getJob(jobId);
+ return
routineLoadJob.shouldDelayScheduleForReadCommittedZeroRowsWithLag(rlTaskTxnCommitAttachment);
+ }
+
private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob)
throws UserException {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(),
id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema
change)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index efdfb4586db..6e03ff01841 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -158,6 +158,10 @@ public abstract class RoutineLoadTaskInfo {
return isEof;
}
+ public boolean isDelaySchedule() {
+ return delaySchedule;
+ }
+
public boolean needDedalySchedule() {
return delaySchedule || isEof;
}
@@ -178,6 +182,7 @@ public abstract class RoutineLoadTaskInfo {
public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
judgeEof(rlTaskTxnCommitAttachment);
+ this.delaySchedule = shouldDelaySchedule(rlTaskTxnCommitAttachment);
}
private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment)
{
@@ -195,7 +200,11 @@ public abstract class RoutineLoadTaskInfo {
}
}
- abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
+ protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment) {
+ return false;
+ }
+
+ protected abstract TRoutineLoadTask createRoutineLoadTask() throws
UserException;
public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 5113c57ec88..b6944df5539 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -34,6 +34,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.jmockit.FieldReflection;
import org.apache.doris.datasource.kafka.KafkaUtil;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
@@ -231,6 +232,81 @@ public class KafkaRoutineLoadJobTest {
Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
}
+ @Test
+ public void
testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws
UserException {
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ Map<String, String> customProperties = Maps.newHashMap();
+ customProperties.put("isolation.level", "read_committed");
+ Deencapsulation.setField(routineLoadJob, "customProperties",
customProperties);
+
+ Map<Integer, Long> cachedPartitionWithLatestOffsets =
Maps.newHashMap();
+ cachedPartitionWithLatestOffsets.put(1, 20L);
+ Deencapsulation.setField(routineLoadJob,
"cachedPartitionWithLatestOffsets",
+ cachedPartitionWithLatestOffsets);
+
+ Map<Integer, Long> taskProgress = Maps.newHashMap();
+ taskProgress.put(1, 10L);
+ RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+ Deencapsulation.setField(attachment, "progress", new
KafkaProgress(taskProgress));
+
+ Deencapsulation.invoke(routineLoadJob, "updateProgress", attachment);
+
+ String otherMsg = Deencapsulation.getField(routineLoadJob, "otherMsg");
+ Assert.assertTrue(otherMsg.contains("some records may be in
uncommitted transactions"));
+ }
+
+ @Test
+ public void testReadCommittedZeroRowsWithLagDelaysNextTask(
+ @Injectable RoutineLoadManager routineLoadManager) throws
UserException {
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ new Expectations() {
+ {
+ routineLoadManager.getJob(1L);
+ minTimes = 0;
+ result = routineLoadJob;
+ }
+ };
+
+ Map<String, String> customProperties = Maps.newHashMap();
+ customProperties.put("isolation.level", "read_committed");
+ Deencapsulation.setField(routineLoadJob, "customProperties",
customProperties);
+
+ Map<Integer, Long> cachedPartitionWithLatestOffsets =
Maps.newHashMap();
+ cachedPartitionWithLatestOffsets.put(1, 20L);
+ Deencapsulation.setField(routineLoadJob,
"cachedPartitionWithLatestOffsets",
+ cachedPartitionWithLatestOffsets);
+
+ Map<Integer, Long> taskProgress = Maps.newHashMap();
+ taskProgress.put(1, 10L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(taskProgress));
+
+ KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
20000,
+ taskProgress, false, 1000, false);
+ FieldReflection.setField(RoutineLoadTaskInfo.class, kafkaTaskInfo,
"routineLoadManager",
+ routineLoadManager);
+ FieldReflection.setField(KafkaTaskInfo.class, kafkaTaskInfo,
"routineLoadManager",
+ routineLoadManager);
+ List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
+ routineLoadTaskInfoList.add(kafkaTaskInfo);
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
routineLoadTaskInfoList);
+
+ RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
+ Deencapsulation.setField(attachment, "progress", new
KafkaProgress(taskProgress));
+ Deencapsulation.setField(attachment, "taskExecutionTimeMs",
+ routineLoadJob.getMaxBatchIntervalS() * 1000);
+
+ kafkaTaskInfo.handleTaskByTxnCommitAttachment(attachment);
+
+ Assert.assertFalse(kafkaTaskInfo.getIsEof());
+ Assert.assertTrue(kafkaTaskInfo.needDedalySchedule());
+
+ RoutineLoadTaskInfo newTask = Deencapsulation.invoke(routineLoadJob,
+ "unprotectRenewTask", kafkaTaskInfo, false);
+ Assert.assertTrue(newTask.needDedalySchedule());
+ }
+
@Test
public void testUpdateLagRebuildsConvertedPropertiesAfterReplay(@Mocked
Env env) throws UserException {
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index 2715b8a45ca..260824a05eb 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+import org.apache.doris.regression.util.RoutineLoadTestUtils
import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerConfig
suite("test_routine_load_error_info","nonConcurrent") {
@@ -30,52 +30,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
"test_error_info",
]
- String enabled = context.config.otherConfigs.get("enableKafkaTest")
- String kafka_port = context.config.otherConfigs.get("kafka_port")
- String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
- def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ def kafkaTestEnabled = RoutineLoadTestUtils.isKafkaTestEnabled(context)
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
// send data to kafka
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- def props = new Properties()
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
- // add timeout config
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
- props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
-
- // check conenction
- def verifyKafkaConnection = { prod ->
- try {
- logger.info("=====try to connect Kafka========")
- def partitions =
prod.partitionsFor("__connection_verification_topic")
- return partitions != null
- } catch (Exception e) {
- throw new Exception("Kafka connect fail:
${e.message}".toString())
- }
- }
- // Create kafka producer
- def producer = new KafkaProducer<>(props)
+ if (kafkaTestEnabled) {
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
try {
- logger.info("Kafka connecting: ${kafka_broker}")
- if (!verifyKafkaConnection(producer)) {
- throw new Exception("can't get any kafka info")
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
[kafkaCsvTopic], txt.readLines())
}
- } catch (Exception e) {
- logger.error("FATAL: " + e.getMessage())
+ } finally {
producer.close()
- throw e
- }
- logger.info("Kafka connect success")
- for (String kafkaCsvTopic in kafkaCsvTpoics) {
- def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
- def lines = txt.readLines()
- lines.each { line ->
- logger.info("=====${line}========")
- def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
- producer.send(record)
- }
}
}
@@ -165,15 +132,47 @@ suite("test_routine_load_error_info","nonConcurrent") {
)
FROM KAFKA
(
- "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_broker_list" = "${kafka_broker}",
"kafka_topic" = "${kafkaTopic}",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
"""
}
+ def createReadCommittedJob = {jobName, tableName, kafkaTopic ->
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${kafkaTopic}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING",
+ "property.isolation.level" = "read_committed"
+ );
+ """
+ }
+
+ def createKafkaTopic = {kafkaTopic ->
+ def adminProps = new Properties()
+ adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ def adminClient = AdminClient.create(adminProps)
+ try {
+ adminClient.createTopics([new NewTopic(kafkaTopic, 1, (short)
1)]).all().get()
+ } finally {
+ adminClient.close()
+ }
+ }
+
// case 1: task failed
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ if (kafkaTestEnabled) {
// create table
def jobName = "test_error_info"
def tableName = "test_routine_error_info"
@@ -209,7 +208,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
}
// case 2: reschedule job
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ if (kafkaTestEnabled) {
def jobName = "test_error_info"
def tableName = "test_routine_error_info"
try {
@@ -242,7 +241,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
}
// case 3: memory limit
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ if (kafkaTestEnabled) {
def jobName = "test_memory_limit_error_info"
def tableName = "test_routine_memory_limit_error_info"
@@ -276,4 +275,50 @@ suite("test_routine_load_error_info","nonConcurrent") {
sql "DROP TABLE IF EXISTS ${tableName}"
}
}
-}
\ No newline at end of file
+
+ // case 4: read_committed lag hint
+ if (kafkaTestEnabled) {
+ def jobName = "test_read_committed_lag_error_info"
+ def tableName = "test_routine_read_committed_lag_error_info"
+ def kafkaTopic =
"test_read_committed_lag_error_info_${System.currentTimeMillis()}"
+ def debugPoint = "KafkaRoutineLoadJob.hasPositiveLagForTask"
+
+ try {
+ createKafkaTopic(kafkaTopic)
+ def producer =
RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+ try {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").text
+ RoutineLoadTestUtils.sendTestDataToKafka(producer,
[kafkaTopic], txt.readLines())
+ } finally {
+ producer.close()
+ }
+ createTable(tableName)
+ sql "sync"
+ GetDebugPoint().enableDebugPointForAllFEs(debugPoint)
+ createReadCommittedJob(jobName, tableName, kafkaTopic)
+ sql "sync"
+
+ // check error info
+ def count = 0
+ while (true) {
+ def res = sql "show routine load for ${jobName}"
+ log.info("show routine load: ${res[0].toString()}".toString())
+ log.info("other msg: ${res[0][19].toString()}".toString())
+ if (res[0][19].toString() != "") {
+ assertTrue(res[0][19].toString().contains("some records
may be in uncommitted transactions"))
+ break;
+ }
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ break;
+ }
+ sleep(1000)
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllFEs(debugPoint)
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]