This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7ba66c58909 [branch-2.1](routine-load) do not schedule task when there
is no data (#34654)
7ba66c58909 is described below
commit 7ba66c58909101e3af18646fcd5e751784f9fa97
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat May 11 11:01:18 2024 +0800
[branch-2.1](routine-load) do not schedule task when there is no data
(#34654)
---
be/src/runtime/routine_load/data_consumer.cpp | 45 +++++
be/src/runtime/routine_load/data_consumer.h | 3 +
.../routine_load/routine_load_task_executor.cpp | 23 +++
.../routine_load/routine_load_task_executor.h | 4 +
be/src/service/internal_service.cpp | 18 +-
.../apache/doris/datasource/kafka/KafkaUtil.java | 76 +++++++
.../load/routineload/KafkaRoutineLoadJob.java | 37 ++--
.../load/routineload/KafkaRoutineLoadJobTest.java | 15 ++
.../doris/load/routineload/RoutineLoadJobTest.java | 15 ++
gensrc/proto/internal_service.proto | 2 +
.../routine_load/test_routine_load_schedule.out | 23 +++
.../load_p0/routine_load/data/test_schedule.csv | 20 ++
.../routine_load/test_routine_load_schedule.groovy | 222 +++++++++++++++++++++
13 files changed, 489 insertions(+), 14 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index ccf5fb4cb25..9c40e85e281 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -415,6 +415,51 @@ Status
KafkaDataConsumer::get_latest_offsets_for_partitions(
return Status::OK();
}
+Status KafkaDataConsumer::get_real_offsets_for_partitions(
+ const std::vector<PIntegerPair>& offset_flags,
std::vector<PIntegerPair>* offsets,
+ int timeout) {
+ MonotonicStopWatch watch;
+ watch.start();
+ for (const auto& entry : offset_flags) {
+ PIntegerPair pair;
+ if (UNLIKELY(entry.val() >= 0)) {
+ pair.set_key(entry.key());
+ pair.set_val(entry.val());
+ offsets->push_back(std::move(pair));
+ continue;
+ }
+
+ int64_t low = 0;
+ int64_t high = 0;
+ auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() /
1000 / 1000);
+ if (UNLIKELY(timeout_ms <= 0)) {
+ return Status::InternalError("get kafka real offsets for
partitions timeout");
+ }
+
+ RdKafka::ErrorCode err =
+ _k_consumer->query_watermark_offsets(_topic, entry.key(),
&low, &high, timeout_ms);
+ if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
+ std::stringstream ss;
+ ss << "failed to get latest offset for partition: " << entry.key()
+ << ", err: " << RdKafka::err2str(err);
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ pair.set_key(entry.key());
+ if (entry.val() == -1) {
+ // OFFSET_END_VAL = -1
+ pair.set_val(high);
+ } else if (entry.val() == -2) {
+ // OFFSET_BEGINNING_VAL = -2
+ pair.set_val(low);
+ }
+ offsets->push_back(std::move(pair));
+ }
+
+ return Status::OK();
+}
+
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index f6c10467786..7ae30b51729 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -155,6 +155,9 @@ public:
// get latest offsets for partitions
Status get_latest_offsets_for_partitions(const std::vector<int32_t>&
partition_ids,
std::vector<PIntegerPair>*
offsets, int timeout);
+ // get offsets for times
+ Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>&
offset_flags,
+ std::vector<PIntegerPair>* offsets,
int timeout);
private:
std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 70441552542..0534531aed2 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -174,6 +174,29 @@ Status
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
return st;
}
+Status RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(
+ const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>*
partition_offsets,
+ int timeout) {
+ CHECK(request.has_kafka_info());
+
+ // This context is meaningless, just for unifing the interface
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, ctx));
+
+ std::shared_ptr<DataConsumer> consumer;
+ RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
+
+ Status st =
+
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_real_offsets_for_partitions(
+ std::vector<PIntegerPair>(request.offset_flags().begin(),
+ request.offset_flags().end()),
+ partition_offsets, timeout);
+ if (st.ok()) {
+ _data_consumer_pool.return_consumer(consumer);
+ }
+ return st;
+}
+
Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
std::unique_lock<std::mutex> l(_lock);
if (_task_map.find(task.id) != _task_map.end()) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index e75c8eb8811..8f8a6f2d653 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -67,6 +67,10 @@ public:
std::vector<PIntegerPair>*
partition_offsets,
int timeout);
+ Status get_kafka_real_offsets_for_partitions(const PKafkaMetaProxyRequest&
request,
+ std::vector<PIntegerPair>*
partition_offsets,
+ int timeout);
+
private:
// execute the task
void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool*
pool,
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 30b948f1d29..6abc972634a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1176,7 +1176,23 @@ void
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
int timeout_ms = request->has_timeout_secs() ? request->timeout_secs()
* 1000 : 5 * 1000;
if (request->has_kafka_meta_request()) {
const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
- if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+ if (!kafka_request.offset_flags().empty()) {
+ std::vector<PIntegerPair> partition_offsets;
+ Status st = _exec_env->routine_load_task_executor()
+ ->get_kafka_real_offsets_for_partitions(
+ request->kafka_meta_request(),
&partition_offsets,
+ timeout_ms);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
+ }
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else if
(!kafka_request.partition_id_for_latest_offsets().empty()) {
// get latest offsets for specified partition ids
std::vector<PIntegerPair> partition_offsets;
Status st = _exec_env->routine_load_task_executor()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index d2184229bb4..00169f4c8ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -217,4 +218,79 @@ public class KafkaUtil {
"Failed to get latest offsets of kafka topic: " + topic +
". error: " + e.getMessage());
}
}
+
+ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList,
String topic,
+ Map<String,
String> convertedCustomProperties,
+
List<Pair<Integer, Long>> offsets)
+ throws
LoadException {
+ // filter values greater than 0 as these offsets is real offset
+ // only update offset like OFFSET_BEGINNING or OFFSET_END
+ List<Pair<Integer, Long>> offsetFlags = new ArrayList<>();
+ List<Pair<Integer, Long>> realOffsets = new ArrayList<>();
+ for (Pair<Integer, Long> pair : offsets) {
+ if (pair.second < 0) {
+ offsetFlags.add(pair);
+ } else {
+ realOffsets.add(pair);
+ }
+ }
+ if (offsetFlags.size() == 0) {
+ LOG.info("do not need update and directly return offsets for
partitions {} in topic: {}", offsets, topic);
+ return offsets;
+ }
+
+ TNetworkAddress address = null;
+ try {
+ List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (backendIds.isEmpty()) {
+ throw new LoadException("Failed to get real offsets. No alive
backends");
+ }
+ Collections.shuffle(backendIds);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+ // create request
+ InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
+ InternalService.PKafkaMetaProxyRequest.newBuilder()
+
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+ .setBrokers(brokerList)
+ .setTopic(topic)
+ .addAllProperties(
+
convertedCustomProperties.entrySet().stream().map(
+ e ->
InternalService.PStringPair.newBuilder()
+ .setKey(e.getKey())
+
.setVal(e.getValue())
+ .build()
+ ).collect(Collectors.toList())
+ )
+ );
+ for (Pair<Integer, Long> pair : offsetFlags) {
+
metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
+ .setVal(pair.second).build());
+ }
+ InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+
+ // get info
+ Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
+ InternalService.PProxyResult result =
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new UserException("failed to get real offsets: " +
result.getStatus().getErrorMsgsList());
+ } else {
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.of(pair.getKey(),
pair.getVal()));
+ }
+ realOffsets.addAll(partitionOffsets);
+ LOG.info("finish to get real offsets for partitions {} in
topic: {}", realOffsets, topic);
+ return realOffsets;
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to get real offsets.", e);
+ throw new LoadException(
+ "Failed to get real offsets of kafka topic: " + topic + ".
error: " + e.getMessage());
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c00f16b7d8a..43ae98d8f7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -512,17 +512,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
for (Integer kafkaPartition : newPartitions) {
partitionOffsets.add(Pair.of(kafkaPartition, beginOffset));
}
- if (isOffsetForTimes()) {
- try {
+ try {
+ if (isOffsetForTimes()) {
partitionOffsets =
KafkaUtil.getOffsetsForTimes(this.brokerList,
this.topic, convertedCustomProperties,
partitionOffsets);
- } catch (LoadException e) {
- LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("partition:timestamp",
Joiner.on(",").join(partitionOffsets))
- .add("error_msg", "Job failed to fetch current offsets
from times with error " + e.getMessage())
- .build(), e);
- throw new UserException(e);
+ } else {
+ partitionOffsets = KafkaUtil.getRealOffsets(this.brokerList,
+ this.topic, convertedCustomProperties,
partitionOffsets);
}
+ } catch (LoadException e) {
+ LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+ .add("partition:", Joiner.on(",").join(partitionOffsets))
+ .add("error_msg", "Job failed to fetch current offsets
with error " + e.getMessage())
+ .build(), e);
+ throw new UserException(e);
}
return partitionOffsets;
}
@@ -552,6 +555,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
kafkaPartitionOffsets =
KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(),
kafkaDataSourceProperties.getTopic(),
convertedCustomProperties,
kafkaDataSourceProperties.getKafkaPartitionOffsets());
+ } else {
+ kafkaPartitionOffsets =
KafkaUtil.getRealOffsets(kafkaDataSourceProperties.getBrokerList(),
+ kafkaDataSourceProperties.getTopic(),
+ convertedCustomProperties,
kafkaDataSourceProperties.getKafkaPartitionOffsets());
}
for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
@@ -638,9 +645,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
public void modifyProperties(AlterRoutineLoadStmt stmt) throws
UserException {
Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
KafkaDataSourceProperties dataSourceProperties =
(KafkaDataSourceProperties) stmt.getDataSourceProperties();
- if (null != dataSourceProperties &&
dataSourceProperties.isOffsetsForTimes()) {
+ if (null != dataSourceProperties) {
// if the partition offset is set by timestamp, convert it to real
offset
- convertTimestampToOffset(dataSourceProperties);
+ convertOffset(dataSourceProperties);
}
writeLock();
@@ -659,13 +666,17 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
- private void convertTimestampToOffset(KafkaDataSourceProperties
dataSourceProperties) throws UserException {
+ private void convertOffset(KafkaDataSourceProperties dataSourceProperties)
throws UserException {
List<Pair<Integer, Long>> partitionOffsets =
dataSourceProperties.getKafkaPartitionOffsets();
if (partitionOffsets.isEmpty()) {
return;
}
- List<Pair<Integer, Long>> newOffsets =
KafkaUtil.getOffsetsForTimes(brokerList, topic,
- convertedCustomProperties, partitionOffsets);
+ List<Pair<Integer, Long>> newOffsets;
+ if (dataSourceProperties.isOffsetsForTimes()) {
+ newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic,
convertedCustomProperties, partitionOffsets);
+ } else {
+ newOffsets = KafkaUtil.getRealOffsets(brokerList, topic,
convertedCustomProperties, partitionOffsets);
+ }
dataSourceProperties.setKafkaPartitionOffsets(newOffsets);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index ba3c9ee626a..d188838ad36 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
@@ -287,6 +288,20 @@ public class KafkaRoutineLoadJobTest {
}
};
+ new MockUp<KafkaUtil>() {
+ @Mock
+ public List<Pair<Integer, Long>> getRealOffsets(String brokerList,
String topic,
+ Map<String,
String> convertedCustomProperties,
+
List<Pair<Integer, Long>> offsetFlags)
+ throws
LoadException {
+ List<Pair<Integer, Long>> pairList = new ArrayList<>();
+ pairList.add(Pair.of(1, 0L));
+ pairList.add(Pair.of(2, 0L));
+ pairList.add(Pair.of(3, 0L));
+ return pairList;
+ }
+ };
+
KafkaRoutineLoadJob kafkaRoutineLoadJob =
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName());
Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8d903957453..d9494374c03 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.InternalCatalog;
@@ -46,6 +48,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -274,6 +277,18 @@ public class RoutineLoadJobTest {
}
};
+ new MockUp<KafkaUtil>() {
+ @Mock
+ public List<Pair<Integer, Long>> getRealOffsets(String brokerList,
String topic,
+ Map<String,
String> convertedCustomProperties,
+
List<Pair<Integer, Long>> offsetFlags)
+ throws
LoadException {
+ List<Pair<Integer, Long>> pairList = new ArrayList<>();
+ pairList.add(Pair.of(1, 0L));
+ return pairList;
+ }
+ };
+
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 8e0608fd730..12b1b6b1eda 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -391,6 +391,8 @@ message PKafkaMetaProxyRequest {
repeated PIntegerPair offset_times = 3;
// optional for getting latest offsets of partitons
repeated int32 partition_id_for_latest_offsets = 4;
+ // optional for getting real offset for end/beginning flag
+ repeated PIntegerPair offset_flags = 5;
};
message PProxyRequest {
diff --git
a/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out
b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out
new file mode 100644
index 00000000000..58613e1bdfb
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+8 2023-08-14 true 109 -31573 -1362465190
3990845741226497177 2732763251146840270 -25698.553
1.312831962567818E9 99999999.9 99999999.9 2023-03-07T14:13:19
2022-10-18 2023-07-16T05:03:13 D
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme
{"animal":"lion","weight":200,"habitat":["savannah","grassland"]} true
1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+20 2023-08-17 false -5 18158 784479801
1485484354598941738 -6632681928222776815 9708.431
-3.30432620706069E8 -99999999.9 99999999.9 2022-09-15T21:40:55
2023-02-23 2023-08-13T21:31:54 O X
2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn
{"name":"Sarah","age":30,"city":"London","isMarried":false} true 1
2 3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不 [...]
+21 2023-08-18 false 63 -27847 -35409596
8638201997392767650 4919963231735304178 -23382.541
-1.803403621426313E9 -22009767.0 99999999.9 2023-03-31T10:56:14
2023-01-20 2023-02-18T13:37:52 N T
PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi
{"city":"Sydney","population":5312000,"area":2058.7} true 1 2
3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+31 2023-08-27 false 17 -18849 1728109133
3266501886640700374 527195452623418935 -24062.328
-1.514348021262435E9 -99999999.9 -99999999.9 2022-10-07T03:24:23
2022-09-25 \N 0 8
yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4
{"team":"Manchester United","players":["Ronaldo","Rooney","Giggs"],"coach":"Ole
Gunnar Solskjaer"} true 1 2 3 4 5 6.0
7.0 888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
+41 2023-08-27 true -104 22750 \N 8527773271030840740
5554497317268279215 -5296.828 -1.71564688801304E9 -99999999.9
99999999.9 2022-12-02T17:56:44 2022-10-12 2023-02-19T07:02:54
V \N
E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V
{"food":"Sushi","price":10,"restaurant":"Sushi King"} true 1 2
3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+49 2023-08-08 false \N 16275 -2144851675
-2303421957908954634 -46526938720058765 -13141.143
-6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01
2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
true 1 2 3 4 5 6.0 7.0
888888888 999999999 2023-08-24 2023-08-24T12:00
2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 \N
+50 2023-08-06 true 109 -6330 1479023892
-8630800697573159428 -1645095773540208759 17880.96
-1.453844792013949E9 -99999999.9 -99999999.9 2022-09-22T02:03:21
2023-05-14 2023-03-25T02:18:34 m
JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl
{"city":"Tokyo","temperature":20.5,"humidity":75} true 1
2 3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+50 2023-08-24 true 15 14403 \N -6418906115745394180
9205303779366462513 -4331.549 -6.15112179557648E8 99999999.9
-99999999.9 2022-12-29T02:27:20 2023-06-01 2023-08-12T04:50:04
a
eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM
{"band":"The Beatles","members":["John Lennon","Paul McCartney","George
Harrison","Ringo Starr"]} true 1 2 3 4 5
6.0 7.0 888888888 999999999 2023-08-24
2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+57 2023-08-19 true 2 -25462 -74112029
6458082754318544493 -7910671781690629051 -15205.859
-3.06870797484914E8 99999999.9 -99999999.9 2023-07-10T18:39:10
2023-02-12 2023-01-27T07:26:06 y
Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ {"name":"John","age":25,"city":"New York"}
true 1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+58 2023-08-22 \N 0 -18231 1832867360
6997858407575297145 2480714305422728023 -5450.489
1.475901032138386E9 -99999999.9 -99999999.9 2023-02-02T05:13:24
2022-09-18 2023-04-23T10:51:15 k
LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja
{"restaurant":"Pizza Hut","menu":["pizza","pasta","salad"]} true
1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+60 2023-08-27 false -52 -2338 -757056972
1047567408607120856 6541476642780646552 6614.0894
-1.204448798517855E9 99999999.9 99999999.9 2022-12-29T14:47:30
2022-09-24 2023-08-01T12:41:59 O F
RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU
{"game":"Chess","players":2,"time":"1 hour"} true 1 2 3
4 5 6.0 7.0 888888888 999999999 2023-08-24
2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+62 2023-08-21 false 81 20302 -200761532
6365479976421007608 \N -29916.533 1.709141750828478E9
99999999.9 -99999999.9 2023-05-04T01:14:51 2022-09-17
2022-12-04T19:30:09 d v
BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD
{"username":"user123","password":"pass123","email":"[email protected]"}
true 1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+65 2023-08-09 false 94 31514 814994517
-297697460695940343 734910652450318597 -13061.892
6.2750847041706E7 -9808654.0 \N 2023-08-14T22:01:27
2023-05-19 2022-11-13T13:44:28 V
aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf
{"language":"Python","version":3.9,"frameworks":["Django","Flask"]}
true 1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+66 2023-08-15 true -91 28378 609923317
4872185586197131212 1207709464099378591 \N -1.863683325985123E9
-99999999.9 -99999999.9 2022-09-24T10:39:23 2022-09-24
2022-10-16T18:36:43 Y z
AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6
{"book":{"title":"The Great Gatsby","author":"F. Scott
Fitzgerald"},"year":1925} true 1 2 3 4 5
6.0 7.0 888888888 999999999 2023-08-24
2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体
我能吞下玻璃而不伤 [...]
+68 2023-08-23 true -73 20117 1737338128
795638676048937749 -5551546237562433901 -30627.04
6.8589475684545E7 99999999.9 99999999.9 2022-12-28T20:26:51
2022-10-04 2023-07-30T00:20:06 y
keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM
{"country":"Brazil","continent":"South America","population":211049527} true
1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而 [...]
+80 2023-08-18 false -18 -8971 679027874
6535956962935330265 3960889045799757165 -13219.76
1.187161924505394E9 -99999999.9 -99999999.9 2023-03-11T07:40
2022-11-29 2023-01-14T07:24:07 \N D
3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve
{"car":"BMW","model":"X5","year":2020,"color":"black"} true 1
2 3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+81 2023-08-23 false 106 11492 -667795397
4480250461471356146 -5346660566234294101 9082.75 3.85167225902608E8
-99999999.9 99999999.9 2023-03-20T03:33:16 2022-11-24
2023-02-16T18:29:41 G 9
Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag
{"flower":"rose","color":"red","fragrance":true} true 1 2
3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+85 2023-08-11 true -7 24304 -2043877415
-2024144417867729183 \N 5363.0244 -5.78615669042831E8
-99999999.9 -99999999.9 2023-07-15T01:07:41 2023-08-13
2023-01-20T11:57:48 i WQ9dh9ajPu0y
{"country":"France","capital":"Paris","population":67081000} true 1
2 3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+90 2023-08-27 true 22 16456 -1476824962
-3279894870153540825 8990195191470116763 26651.906
2.06860148942546E8 -99999999.9 -99999999.9 2022-10-07T03:11:03
2023-03-18 2023-04-15T00:38:33 T L
QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD
{"company":"Apple","products":[{"name":"iPhone","price":1000},{"name":"MacBook","price":1500}]}
true 1 2 3 4 5 6.0 7.0 888888888
999999999 2023-08-24 2023-08-24T12:00 2023-08-24
2023-08-24T12:00 我能吞下玻璃而不伤身体 [...]
+91 2023-08-27 true 90 2465 702240964
6373830997821598984 305860046137409400 15991.356
1.599972327386147E9 -99999999.9 \N 2023-04-26T19:31:10
2023-07-21 \N 2
B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK
{"fruit":"apple","color":"red","qty":5,"price":2.5} true 1 2
3 4 5 6.0 7.0 888888888 999999999
2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00
我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N
+
diff --git a/regression-test/suites/load_p0/routine_load/data/test_schedule.csv
b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv
new file mode 100644
index 00000000000..b58285ed575
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv
@@ -0,0 +1,20 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
18:39:10|2023-02-12|2023-01-27
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city":
"New York"}
+49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true,
"name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}
+66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24
10:39:23|2022-09-24|2022-10-16
18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book":
{"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925}
+91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26
19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit":
"apple", "color": "red", "qty": 5, "price": 2.5}
+80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11
07:40:00|2022-11-29|2023-01-14
07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car":
"BMW", "model": "X5", "year": 2020, "color": "black"}
+85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15
01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France",
"capital": "Paris", "population": 67081000}
+31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07
03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team":
"Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole
Gunnar Solskjaer"}
+20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15
21:40:55|2023-02-23|2023-08-13
21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name":
"Sarah", "age": 30, "city": "London", "isMarried": false}
+90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07
03:11:03|2023-03-18|2023-04-15
00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company":
"Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook",
"price": 1500}]}
+8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07
14:13:19|2022-10-18|2023-07-16
05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal":
"lion", "weight": 200, "habitat": ["savannah", "grassland"]}
+65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14
22:01:27|2023-05-19|2022-11-13
13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language":
"Python", "version": 3.9, "frameworks": ["Django", "Flask"]}
+62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04
01:14:51|2022-09-17|2022-12-04
19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username":
"user123", "password": "pass123", "email": "[email protected]"}
+50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22
02:03:21|2023-05-14|2023-03-25
02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city":
"Tokyo", "temperature": 20.5, "humidity": 75}
+58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02
05:13:24|2022-09-18|2023-04-23
10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant":
"Pizza Hut", "menu": ["pizza", "pasta", "salad"]}
+60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29
14:47:30|2022-09-24|2023-08-01
12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game":
"Chess", "players": 2, "time": "1 hour"}
+68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28
20:26:51|2022-10-04|2023-07-30
00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country":
"Brazil", "continent": "South America", "population": 211049527}
+50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29
02:27:20|2023-06-01|2023-08-12
04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band":
"The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison",
"Ringo Starr"]}
+81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20
03:33:16|2022-11-24|2023-02-16
18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose",
"color": "red", "fragrance": true}
+41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02
17:56:44|2022-10-12|2023-02-19
07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food":
"Sushi", "price": 10, "restaurant": "Sushi King"}
+21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31
10:56:14|2023-01-20|2023-02-18
13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city":
"Sydney", "population": 5312000, "area": 2058.7}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
new file mode 100644
index 00000000000..c8044ad1404
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+
+suite("test_routine_load_schedule","p0") {
+ def kafkaCsvTpoics = [
+ "test_schedule",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ sleep(10000)
+
+ def jobName = "testScheduleJob"
+ def tableName = "test_routine_load_schedule"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "test_schedule",
+ "property.kafka_default_offsets" = "OFFSET_END"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "RUNNING") {
+ count++
+ if (count > 300) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ break;
+ }
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+ }
+
+ sleep(5000)
+
+ count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "RUNNING") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][11].toString()}".toString())
+ break;
+ }
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ log.info("progress: ${res[0][15].toString()}".toString())
+ log.info("lag: ${res[0][16].toString()}".toString())
+ def json = parseJson(res[0][14])
+ if (json.loadedRows.toString() != "20") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ break;
+ }
+ qt_sql "select * from ${tableName} order by k00, k01"
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]