This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a6f4710bd59 [fix](routine load) reset Kafka progress cache when
routine load job topic change (#38474) (#39528)
a6f4710bd59 is described below
commit a6f4710bd5985a859188bda0a36b85643970ef3e
Author: hui lai <[email protected]>
AuthorDate: Mon Aug 26 21:18:13 2024 +0800
[fix](routine load) reset Kafka progress cache when routine load job topic
change (#38474) (#39528)
pick (#38474)
When change routine load job topic from test_topic_before to
test_topic_after by
```
ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" =
"test_topic_after");
```
(test_topic_before has 5 rows and test_topic_after has 1 rows)
Exception happened, which cannot consume any data:
```
2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55)
[KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback.
partition: 0, cache offset: 5 get latest of
fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615
```
It is necessary to reset Kafka progress cache when routine load job
topic change.
---
.../doris/load/routineload/KafkaProgress.java | 8 ++++---
.../load/routineload/KafkaRoutineLoadJob.java | 28 +++++++++++++++-------
2 files changed, 24 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 3b219f48323..8bf2a957eb6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -118,15 +118,17 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
- // modify the partition offset of this progress.
- // throw exception is the specified partition does not exist in progress.
- public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets)
throws DdlException {
+ public void checkPartitions(List<Pair<Integer, Long>>
kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first
+ " is not in the consumed partitions");
}
}
+ }
+ // modify the partition offset of this progress.
+ // throw exception is the specified partition does not exist in progress.
+ public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 692d79d445e..8fedd791b1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -673,22 +673,32 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
customKafkaProperties =
dataSourceProperties.getCustomKafkaProperties();
}
- // modify partition offset first
- if (!kafkaPartitionOffsets.isEmpty()) {
- // we can only modify the partition that is being consumed
- ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
- }
-
+ // convertCustomProperties and check partitions before reset
progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
- // modify broker list and topic
- if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
- this.brokerList = dataSourceProperties.getBrokerList();
+
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ ((KafkaProgress)
progress).checkPartitions(kafkaPartitionOffsets);
}
+
+ // It is necessary to reset the Kafka progress cache if topic
change,
+ // and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
this.topic = dataSourceProperties.getTopic();
+ this.progress = new KafkaProgress();
+ }
+
+ // modify partition offset
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ // we can only modify the partition that is being consumed
+ ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
+ }
+
+ // modify broker list
+ if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
+ this.brokerList = dataSourceProperties.getBrokerList();
}
}
if (!jobProperties.isEmpty()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]