This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 357dc5fbdac [opt](routine load) support routine load perceived schema
change (#39412)
357dc5fbdac is described below
commit 357dc5fbdac533323ac31090490c3085511fb497
Author: hui lai <[email protected]>
AuthorDate: Tue Aug 27 20:29:44 2024 +0800
[opt](routine load) support routine load perceived schema change (#39412)
At present, if the table structure changes, the routine load cannot
perceive it. As a long-running load, it should be able to perceive the
changes in the table structure.
---
.../load/routineload/KafkaRoutineLoadJob.java | 1 -
.../doris/load/routineload/KafkaTaskInfo.java | 8 +-
.../doris/load/routineload/RoutineLoadJob.java | 19 +--
.../routine_load/test_routine_load_with_sc.out | 4 +
.../data/test_routine_load_with_sc.csv | 1 +
.../routine_load/test_routine_load_with_sc.groovy | 149 +++++++++++++++++++++
6 files changed, 163 insertions(+), 19 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index abd1800a19d..1762f8d1122 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -174,7 +174,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
public void prepare() throws UserException {
- super.prepare();
// should reset converted properties each time the job being prepared.
// because the file info can be changed anytime.
convertCustomProperties(true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index dedb66d5f94..52a1ad8559f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -19,10 +19,12 @@ package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TKafkaLoadInfo;
import org.apache.doris.thrift.TLoadSourceType;
@@ -127,7 +129,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob)
throws UserException {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(),
id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema
change)
- TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(loadId, txnId);
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
+ StreamLoadPlanner planner = new StreamLoadPlanner(db,
+ (OlapTable)
db.getTableOrMetaException(routineLoadJob.getTableId(),
+ Table.TableType.OLAP), routineLoadJob);
+ TPipelineFragmentParams tExecPlanFragmentParams =
routineLoadJob.plan(planner, loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it needs update timeout to make task timeout backoff work
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 2b8cbbd81ac..b983d6beed4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -262,9 +262,6 @@ public abstract class RoutineLoadJob
// The tasks belong to this job
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
Lists.newArrayList();
- // stream load planer will be initialized during job schedule
- protected StreamLoadPlanner planner;
-
// this is the origin stmt of CreateRoutineLoadStmt, we use it to persist
the RoutineLoadJob,
// because we can not serialize the Expressions contained in job.
@SerializedName("ostmt")
@@ -967,21 +964,9 @@ public abstract class RoutineLoadJob
// call before first scheduling
// derived class can override this.
- public void prepare() throws UserException {
- initPlanner();
- }
-
- private void initPlanner() throws UserException {
- // for multi table load job, the table name is dynamic,we will set
table when task scheduling.
- if (isMultiTable) {
- return;
- }
- Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
- planner = new StreamLoadPlanner(db,
- (OlapTable) db.getTableOrMetaException(this.tableId,
Table.TableType.OLAP), this);
- }
+ public abstract void prepare() throws UserException;
- public TPipelineFragmentParams plan(TUniqueId loadId, long txnId) throws
UserException {
+ public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId
loadId, long txnId) throws UserException {
Preconditions.checkNotNull(planner);
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table table = db.getTableOrMetaException(tableId,
Table.TableType.OLAP);
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out
b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out
new file mode 100644
index 00000000000..974fdb75559
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_sc.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_with_sc --
+1 eab 2023-07-15 def 2023-07-20T05:48:31 aaaaaaaa
+
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
new file mode 100644
index 00000000000..6434a1b032f
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_sc.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,aaaaaaaa
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
new file mode 100644
index 00000000000..33c047062dd
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_with_sc","p0") {
+ def kafkaCsvTpoics = [
+ "test_routine_load_with_sc",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ def jobName = "test_routine_load_with_sc_job"
+ def tableName = "test_routine_load_with_sc"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ sql """ DROP TABLE IF EXISTS ${tableName}"""
+ sql """ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` varchar(5) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" =
"tag.location.default: 1");
+ """
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "PAUSED") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ break;
+ }
+
+ sql "ALTER TABLE ${tableName} MODIFY COLUMN v4 VARCHAR(10)"
+ sql "resume routine load for ${jobName}"
+
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (res[0][0] > 0) {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(5000)
+ count++
+ }
+ qt_sql_with_sc "select * from ${tableName} order by k1"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]