This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0ee0dd6ae3a [fix](routine load) reset Kafka progress cache when
routine load job topic change (#38474) (#39181)
0ee0dd6ae3a is described below
commit 0ee0dd6ae3abb50d56add222443f2be6214650ff
Author: hui lai <[email protected]>
AuthorDate: Sat Aug 10 23:00:39 2024 +0800
[fix](routine load) reset Kafka progress cache when routine load job topic
change (#38474) (#39181)
pick (#38474)
When change routine load job topic from test_topic_before to
test_topic_after by
```
ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" =
"test_topic_after");
```
(test_topic_before has 5 rows and test_topic_after has 1 rows)
Exception happened, which cannot consume any data:
```
2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
```
It is necessary to reset Kafka progress cache when routine load job
topic change.
---
.../doris/load/routineload/KafkaProgress.java | 8 +-
.../load/routineload/KafkaRoutineLoadJob.java | 28 +++--
.../test_routine_load_topic_change.out | 16 +++
.../load_p0/routine_load/data/test_topic_after.csv | 1 +
.../routine_load/data/test_topic_before.csv | 5 +
.../test_routine_load_topic_change.groovy | 140 +++++++++++++++++++++
6 files changed, 186 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 53c57a1cceb..7dd49ba1ec7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -118,15 +118,17 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
- // modify the partition offset of this progress.
- // throw exception is the specified partition does not exist in progress.
- public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets)
throws DdlException {
+ public void checkPartitions(List<Pair<Integer, Long>>
kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first
+ " is not in the consumed partitions");
}
}
+ }
+ // modify the partition offset of this progress.
+ // throw exception is the specified partition does not exist in progress.
+ public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 201412027ab..22de3ef3574 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -692,22 +692,32 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
customKafkaProperties =
dataSourceProperties.getCustomKafkaProperties();
}
- // modify partition offset first
- if (!kafkaPartitionOffsets.isEmpty()) {
- // we can only modify the partition that is being consumed
- ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
- }
-
+ // convertCustomProperties and check partitions before reset
progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
- // modify broker list and topic
- if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
- this.brokerList = dataSourceProperties.getBrokerList();
+
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ ((KafkaProgress)
progress).checkPartitions(kafkaPartitionOffsets);
}
+
+ // It is necessary to reset the Kafka progress cache if topic
change,
+ // and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
+ this.progress = new KafkaProgress();
+ }
+
+ // modify partition offset
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ // we can only modify the partition that is being consumed
+ ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+ }
+
+ // modify broker list
+ if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+ this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
new file mode 100644
index 00000000000..1f534d0a082
--- /dev/null
+++
b/regression-test/data/load_p0/routine_load/test_routine_load_topic_change.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_topic_change --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+
+-- !sql_topic_change1 --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+2 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+3 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+4 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+5 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+6 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi"
+
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
new file mode 100644
index 00000000000..de1727d2d81
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_after.csv
@@ -0,0 +1 @@
+6,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
new file mode 100644
index 00000000000..f1a48b1e411
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_topic_before.csv
@@ -0,0 +1,5 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+2,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+3,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+4,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
+5,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
new file mode 100644
index 00000000000..25bf9933d11
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_topic_change","p0") {
+ // send data to Kafka
+ def kafkaCsvTpoics = [
+ "test_topic_before",
+ "test_topic_after",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ // test create routine load job with enclose and escape
+ def tableName = "test_routine_load_topic_change"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "test_topic_change"
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_topic_change "select * from ${tableName} order by k1"
+
+ sql "pause routine load for ${jobName}"
+ def res = sql "show routine load for ${jobName}"
+ log.info("routine load job properties:
${res[0][11].toString()}".toString())
+ sql "ALTER ROUTINE LOAD FOR ${jobName} FROM KAFKA(\"kafka_topic\"
= \"${kafkaCsvTpoics[1]}\", \"property.kafka_default_offsets\" =
\"OFFSET_BEGINNING\");"
+ sql "resume routine load for ${jobName}"
+ count = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 5) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql_topic_change1 "select * from ${tableName} order by k1"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]