This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7ba66c58909 [branch-2.1](routine-load) do not schedule task when there 
is no data (#34654)
7ba66c58909 is described below

commit 7ba66c58909101e3af18646fcd5e751784f9fa97
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat May 11 11:01:18 2024 +0800

    [branch-2.1](routine-load) do not schedule task when there is no data 
(#34654)
---
 be/src/runtime/routine_load/data_consumer.cpp      |  45 +++++
 be/src/runtime/routine_load/data_consumer.h        |   3 +
 .../routine_load/routine_load_task_executor.cpp    |  23 +++
 .../routine_load/routine_load_task_executor.h      |   4 +
 be/src/service/internal_service.cpp                |  18 +-
 .../apache/doris/datasource/kafka/KafkaUtil.java   |  76 +++++++
 .../load/routineload/KafkaRoutineLoadJob.java      |  37 ++--
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  15 ++
 .../doris/load/routineload/RoutineLoadJobTest.java |  15 ++
 gensrc/proto/internal_service.proto                |   2 +
 .../routine_load/test_routine_load_schedule.out    |  23 +++
 .../load_p0/routine_load/data/test_schedule.csv    |  20 ++
 .../routine_load/test_routine_load_schedule.groovy | 222 +++++++++++++++++++++
 13 files changed, 489 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index ccf5fb4cb25..9c40e85e281 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -415,6 +415,51 @@ Status 
KafkaDataConsumer::get_latest_offsets_for_partitions(
     return Status::OK();
 }
 
+Status KafkaDataConsumer::get_real_offsets_for_partitions(
+        const std::vector<PIntegerPair>& offset_flags, 
std::vector<PIntegerPair>* offsets,
+        int timeout) {
+    MonotonicStopWatch watch;
+    watch.start();
+    for (const auto& entry : offset_flags) {
+        PIntegerPair pair;
+        if (UNLIKELY(entry.val() >= 0)) {
+            pair.set_key(entry.key());
+            pair.set_val(entry.val());
+            offsets->push_back(std::move(pair));
+            continue;
+        }
+
+        int64_t low = 0;
+        int64_t high = 0;
+        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 
1000 / 1000);
+        if (UNLIKELY(timeout_ms <= 0)) {
+            return Status::InternalError("get kafka real offsets for 
partitions timeout");
+        }
+
+        RdKafka::ErrorCode err =
+                _k_consumer->query_watermark_offsets(_topic, entry.key(), 
&low, &high, timeout_ms);
+        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
+            std::stringstream ss;
+            ss << "failed to get latest offset for partition: " << entry.key()
+               << ", err: " << RdKafka::err2str(err);
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+
+        pair.set_key(entry.key());
+        if (entry.val() == -1) {
+            // OFFSET_END_VAL = -1
+            pair.set_val(high);
+        } else if (entry.val() == -2) {
+            // OFFSET_BEGINNING_VAL = -2
+            pair.set_val(low);
+        }
+        offsets->push_back(std::move(pair));
+    }
+
+    return Status::OK();
+}
+
 Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
     std::unique_lock<std::mutex> l(_lock);
     if (!_init) {
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index f6c10467786..7ae30b51729 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -155,6 +155,9 @@ public:
     // get latest offsets for partitions
     Status get_latest_offsets_for_partitions(const std::vector<int32_t>& 
partition_ids,
                                              std::vector<PIntegerPair>* 
offsets, int timeout);
+    // get offsets for times
+    Status get_real_offsets_for_partitions(const std::vector<PIntegerPair>& 
offset_flags,
+                                           std::vector<PIntegerPair>* offsets, 
int timeout);
 
 private:
     std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 70441552542..0534531aed2 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -174,6 +174,29 @@ Status 
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
     return st;
 }
 
+Status RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(
+        const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* 
partition_offsets,
+        int timeout) {
+    CHECK(request.has_kafka_info());
+
+    // This context is meaningless, just for unifing the interface
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, ctx));
+
+    std::shared_ptr<DataConsumer> consumer;
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer));
+
+    Status st =
+            
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_real_offsets_for_partitions(
+                    std::vector<PIntegerPair>(request.offset_flags().begin(),
+                                              request.offset_flags().end()),
+                    partition_offsets, timeout);
+    if (st.ok()) {
+        _data_consumer_pool.return_consumer(consumer);
+    }
+    return st;
+}
+
 Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
     std::unique_lock<std::mutex> l(_lock);
     if (_task_map.find(task.id) != _task_map.end()) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index e75c8eb8811..8f8a6f2d653 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -67,6 +67,10 @@ public:
                                                    std::vector<PIntegerPair>* 
partition_offsets,
                                                    int timeout);
 
+    Status get_kafka_real_offsets_for_partitions(const PKafkaMetaProxyRequest& 
request,
+                                                 std::vector<PIntegerPair>* 
partition_offsets,
+                                                 int timeout);
+
 private:
     // execute the task
     void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* 
pool,
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 30b948f1d29..6abc972634a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1176,7 +1176,23 @@ void 
PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
         int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 5 * 1000;
         if (request->has_kafka_meta_request()) {
             const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+            if (!kafka_request.offset_flags().empty()) {
+                std::vector<PIntegerPair> partition_offsets;
+                Status st = _exec_env->routine_load_task_executor()
+                                    ->get_kafka_real_offsets_for_partitions(
+                                            request->kafka_meta_request(), 
&partition_offsets,
+                                            timeout_ms);
+                if (st.ok()) {
+                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                    for (const auto& entry : partition_offsets) {
+                        PIntegerPair* res = part_offsets->add_offset_times();
+                        res->set_key(entry.key());
+                        res->set_val(entry.val());
+                    }
+                }
+                st.to_protobuf(response->mutable_status());
+                return;
+            } else if 
(!kafka_request.partition_id_for_latest_offsets().empty()) {
                 // get latest offsets for specified partition ids
                 std::vector<PIntegerPair> partition_offsets;
                 Status st = _exec_env->routine_load_task_executor()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index d2184229bb4..00169f4c8ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -217,4 +218,79 @@ public class KafkaUtil {
                     "Failed to get latest offsets of kafka topic: " + topic + 
". error: " + e.getMessage());
         }
     }
+
+    public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, 
String topic,
+                                                             Map<String, 
String> convertedCustomProperties,
+                                                             
List<Pair<Integer, Long>> offsets)
+                                                             throws 
LoadException {
+        // filter values greater than 0 as these offsets is real offset
+        // only update offset like OFFSET_BEGINNING or OFFSET_END
+        List<Pair<Integer, Long>> offsetFlags = new ArrayList<>();
+        List<Pair<Integer, Long>> realOffsets = new ArrayList<>();
+        for (Pair<Integer, Long> pair : offsets) {
+            if (pair.second < 0) {
+                offsetFlags.add(pair);
+            } else {
+                realOffsets.add(pair);
+            }
+        }
+        if (offsetFlags.size() == 0) {
+            LOG.info("do not need update and directly return offsets for 
partitions {} in topic: {}", offsets, topic);
+            return offsets;
+        }
+
+        TNetworkAddress address = null;
+        try {
+            List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get real offsets. No alive 
backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            // create request
+            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
+                    InternalService.PKafkaMetaProxyRequest.newBuilder()
+                            
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+                                    .setBrokers(brokerList)
+                                    .setTopic(topic)
+                                    .addAllProperties(
+                                            
convertedCustomProperties.entrySet().stream().map(
+                                                    e -> 
InternalService.PStringPair.newBuilder()
+                                                            .setKey(e.getKey())
+                                                            
.setVal(e.getValue())
+                                                            .build()
+                                            ).collect(Collectors.toList())
+                                    )
+                            );
+            for (Pair<Integer, Long> pair : offsetFlags) {
+                
metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
+                        .setVal(pair.second).build());
+            }
+            InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
+
+            // get info
+            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
+            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new UserException("failed to get real offsets: " + 
result.getStatus().getErrorMsgsList());
+            } else {
+                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
+                for (InternalService.PIntegerPair pair : pairs) {
+                    partitionOffsets.add(Pair.of(pair.getKey(), 
pair.getVal()));
+                }
+                realOffsets.addAll(partitionOffsets);
+                LOG.info("finish to get real offsets for partitions {} in 
topic: {}", realOffsets, topic);
+                return realOffsets;
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get real offsets.", e);
+            throw new LoadException(
+                    "Failed to get real offsets of kafka topic: " + topic + ". 
error: " + e.getMessage());
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index c00f16b7d8a..43ae98d8f7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -512,17 +512,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         for (Integer kafkaPartition : newPartitions) {
             partitionOffsets.add(Pair.of(kafkaPartition, beginOffset));
         }
-        if (isOffsetForTimes()) {
-            try {
+        try {
+            if (isOffsetForTimes()) {
                 partitionOffsets = 
KafkaUtil.getOffsetsForTimes(this.brokerList,
                         this.topic, convertedCustomProperties, 
partitionOffsets);
-            } catch (LoadException e) {
-                LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                        .add("partition:timestamp", 
Joiner.on(",").join(partitionOffsets))
-                        .add("error_msg", "Job failed to fetch current offsets 
from times with error " + e.getMessage())
-                        .build(), e);
-                throw new UserException(e);
+            } else {
+                partitionOffsets = KafkaUtil.getRealOffsets(this.brokerList,
+                        this.topic, convertedCustomProperties, 
partitionOffsets);
             }
+        } catch (LoadException e) {
+            LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                    .add("partition:", Joiner.on(",").join(partitionOffsets))
+                    .add("error_msg", "Job failed to fetch current offsets 
with error " + e.getMessage())
+                    .build(), e);
+            throw new UserException(e);
         }
         return partitionOffsets;
     }
@@ -552,6 +555,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             kafkaPartitionOffsets = 
KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(),
                     kafkaDataSourceProperties.getTopic(),
                     convertedCustomProperties, 
kafkaDataSourceProperties.getKafkaPartitionOffsets());
+        } else {
+            kafkaPartitionOffsets = 
KafkaUtil.getRealOffsets(kafkaDataSourceProperties.getBrokerList(),
+                    kafkaDataSourceProperties.getTopic(),
+                    convertedCustomProperties, 
kafkaDataSourceProperties.getKafkaPartitionOffsets());
         }
 
         for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
@@ -638,9 +645,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     public void modifyProperties(AlterRoutineLoadStmt stmt) throws 
UserException {
         Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
         KafkaDataSourceProperties dataSourceProperties = 
(KafkaDataSourceProperties) stmt.getDataSourceProperties();
-        if (null != dataSourceProperties && 
dataSourceProperties.isOffsetsForTimes()) {
+        if (null != dataSourceProperties) {
             // if the partition offset is set by timestamp, convert it to real 
offset
-            convertTimestampToOffset(dataSourceProperties);
+            convertOffset(dataSourceProperties);
         }
 
         writeLock();
@@ -659,13 +666,17 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
-    private void convertTimestampToOffset(KafkaDataSourceProperties 
dataSourceProperties) throws UserException {
+    private void convertOffset(KafkaDataSourceProperties dataSourceProperties) 
throws UserException {
         List<Pair<Integer, Long>> partitionOffsets = 
dataSourceProperties.getKafkaPartitionOffsets();
         if (partitionOffsets.isEmpty()) {
             return;
         }
-        List<Pair<Integer, Long>> newOffsets = 
KafkaUtil.getOffsetsForTimes(brokerList, topic,
-                convertedCustomProperties, partitionOffsets);
+        List<Pair<Integer, Long>> newOffsets;
+        if (dataSourceProperties.isOffsetsForTimes()) {
+            newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic, 
convertedCustomProperties, partitionOffsets);
+        } else {
+            newOffsets = KafkaUtil.getRealOffsets(brokerList, topic, 
convertedCustomProperties, partitionOffsets);
+        }
         dataSourceProperties.setKafkaPartitionOffsets(newOffsets);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index ba3c9ee626a..d188838ad36 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
@@ -287,6 +288,20 @@ public class KafkaRoutineLoadJobTest {
             }
         };
 
+        new MockUp<KafkaUtil>() {
+            @Mock
+            public List<Pair<Integer, Long>> getRealOffsets(String brokerList, 
String topic,
+                                                             Map<String, 
String> convertedCustomProperties,
+                                                             
List<Pair<Integer, Long>> offsetFlags)
+                                                             throws 
LoadException {
+                List<Pair<Integer, Long>> pairList = new ArrayList<>();
+                pairList.add(Pair.of(1, 0L));
+                pairList.add(Pair.of(2, 0L));
+                pairList.add(Pair.of(3, 0L));
+                return pairList;
+            }
+        };
+
         KafkaRoutineLoadJob kafkaRoutineLoadJob = 
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
         Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName());
         Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8d903957453..d9494374c03 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.datasource.InternalCatalog;
@@ -46,6 +48,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -274,6 +277,18 @@ public class RoutineLoadJobTest {
             }
         };
 
+        new MockUp<KafkaUtil>() {
+            @Mock
+            public List<Pair<Integer, Long>> getRealOffsets(String brokerList, 
String topic,
+                                                             Map<String, 
String> convertedCustomProperties,
+                                                             
List<Pair<Integer, Long>> offsetFlags)
+                                                             throws 
LoadException {
+                List<Pair<Integer, Long>> pairList = new ArrayList<>();
+                pairList.add(Pair.of(1, 0L));
+                return pairList;
+            }
+        };
+
         RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
         Deencapsulation.setField(routineLoadJob, "state", 
RoutineLoadJob.JobState.RUNNING);
         Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 8e0608fd730..12b1b6b1eda 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -391,6 +391,8 @@ message PKafkaMetaProxyRequest {
     repeated PIntegerPair offset_times = 3;
     // optional for getting latest offsets of partitons
     repeated int32 partition_id_for_latest_offsets = 4;
+    // optional for getting real offset for end/beginning flag
+    repeated PIntegerPair offset_flags = 5;
 };
 
 message PProxyRequest {
diff --git 
a/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out 
b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out
new file mode 100644
index 00000000000..58613e1bdfb
--- /dev/null
+++ b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+8      2023-08-14      true    109     -31573  -1362465190     
3990845741226497177     2732763251146840270     -25698.553      
1.312831962567818E9     99999999.9      99999999.9      2023-03-07T14:13:19     
2022-10-18      2023-07-16T05:03:13     D               
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme        
{"animal":"lion","weight":200,"habitat":["savannah","grassland"]}       true    
1       2       3       4       5       6.0     7.0     888888888       
999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+20     2023-08-17      false   -5      18158   784479801       
1485484354598941738     -6632681928222776815    9708.431        
-3.30432620706069E8     -99999999.9     99999999.9      2022-09-15T21:40:55     
2023-02-23      2023-08-13T21:31:54     O       X       
2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn
   {"name":"Sarah","age":30,"city":"London","isMarried":false}     true    1    
   2       3       4       5       6.0     7.0     888888888       999999999    
   2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00     
   我能吞下玻璃而不伤身体     我能吞下玻璃而不 [...]
+21     2023-08-18      false   63      -27847  -35409596       
8638201997392767650     4919963231735304178     -23382.541      
-1.803403621426313E9    -22009767.0     99999999.9      2023-03-31T10:56:14     
2023-01-20      2023-02-18T13:37:52     N       T       
PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi      
{"city":"Sydney","population":5312000,"area":2058.7}    true    1       2       
3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+31     2023-08-27      false   17      -18849  1728109133      
3266501886640700374     527195452623418935      -24062.328      
-1.514348021262435E9    -99999999.9     -99999999.9     2022-10-07T03:24:23     
2022-09-25      \N      0       8       
yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4       
{"team":"Manchester United","players":["Ronaldo","Rooney","Giggs"],"coach":"Ole 
Gunnar Solskjaer"}      true    1       2       3       4       5       6.0     
7.0     888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     \N
+41     2023-08-27      true    -104    22750   \N      8527773271030840740     
5554497317268279215     -5296.828       -1.71564688801304E9     -99999999.9     
99999999.9      2022-12-02T17:56:44     2022-10-12      2023-02-19T07:02:54     
V       \N      
E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V
    {"food":"Sushi","price":10,"restaurant":"Sushi King"}   true    1       2   
    3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+49     2023-08-08      false   \N      16275   -2144851675     
-2303421957908954634    -46526938720058765      -13141.143      
-6.866322332302E8       99999999.9      -99999999.9     2022-09-01T00:16:01     
2023-03-25      2022-09-07T14:59:03     s               yvuILR2iNxfe8RRml       
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
       true    1       2       3       4       5       6.0     7.0     
888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     \N
+50     2023-08-06      true    109     -6330   1479023892      
-8630800697573159428    -1645095773540208759    17880.96        
-1.453844792013949E9    -99999999.9     -99999999.9     2022-09-22T02:03:21     
2023-05-14      2023-03-25T02:18:34     m               
JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl
       {"city":"Tokyo","temperature":20.5,"humidity":75}       true    1       
2       3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+50     2023-08-24      true    15      14403   \N      -6418906115745394180    
9205303779366462513     -4331.549       -6.15112179557648E8     99999999.9      
-99999999.9     2022-12-29T02:27:20     2023-06-01      2023-08-12T04:50:04     
a               
eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM
    {"band":"The Beatles","members":["John Lennon","Paul McCartney","George 
Harrison","Ringo Starr"]}       true    1       2       3       4       5       
6.0     7.0     888888888       999999999       2023-08-24      
2023-08-24T12:00        2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+57     2023-08-19      true    2       -25462  -74112029       
6458082754318544493     -7910671781690629051    -15205.859      
-3.06870797484914E8     99999999.9      -99999999.9     2023-07-10T18:39:10     
2023-02-12      2023-01-27T07:26:06     y               
Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ   {"name":"John","age":25,"city":"New York"}      
true    1       2       3       4       5       6.0     7.0     888888888       
999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+58     2023-08-22      \N      0       -18231  1832867360      
6997858407575297145     2480714305422728023     -5450.489       
1.475901032138386E9     -99999999.9     -99999999.9     2023-02-02T05:13:24     
2022-09-18      2023-04-23T10:51:15     k               
LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja
        {"restaurant":"Pizza Hut","menu":["pizza","pasta","salad"]}     true    
1       2       3       4       5       6.0     7.0     888888888       
999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+60     2023-08-27      false   -52     -2338   -757056972      
1047567408607120856     6541476642780646552     6614.0894       
-1.204448798517855E9    99999999.9      99999999.9      2022-12-29T14:47:30     
2022-09-24      2023-08-01T12:41:59     O       F       
RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU     
{"game":"Chess","players":2,"time":"1 hour"}    true    1       2       3       
4       5       6.0     7.0     888888888       999999999       2023-08-24      
2023-08-24T12:00        2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+62     2023-08-21      false   81      20302   -200761532      
6365479976421007608     \N      -29916.533      1.709141750828478E9     
99999999.9      -99999999.9     2023-05-04T01:14:51     2022-09-17      
2022-12-04T19:30:09     d       v       
BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD
    {"username":"user123","password":"pass123","email":"[email protected]"}   
    true    1       2       3       4       5       6.0     7.0     888888888   
    999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+65     2023-08-09      false   94      31514   814994517       
-297697460695940343     734910652450318597      -13061.892      
6.2750847041706E7       -9808654.0      \N      2023-08-14T22:01:27     
2023-05-19      2022-11-13T13:44:28     V               
aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf
        {"language":"Python","version":3.9,"frameworks":["Django","Flask"]}     
true    1       2       3       4       5       6.0     7.0     888888888       
999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+66     2023-08-15      true    -91     28378   609923317       
4872185586197131212     1207709464099378591     \N      -1.863683325985123E9    
-99999999.9     -99999999.9     2022-09-24T10:39:23     2022-09-24      
2022-10-16T18:36:43     Y       z       
AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6
    {"book":{"title":"The Great Gatsby","author":"F. Scott 
Fitzgerald"},"year":1925}        true    1       2       3       4       5      
 6.0     7.0     888888888       999999999       2023-08-24      
2023-08-24T12:00        2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤 [...]
+68     2023-08-23      true    -73     20117   1737338128      
795638676048937749      -5551546237562433901    -30627.04       
6.8589475684545E7       99999999.9      99999999.9      2022-12-28T20:26:51     
2022-10-04      2023-07-30T00:20:06     y               
keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM
   {"country":"Brazil","continent":"South America","population":211049527} true 
   1       2       3       4       5       6.0     7.0     888888888       
999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而 [...]
+80     2023-08-18      false   -18     -8971   679027874       
6535956962935330265     3960889045799757165     -13219.76       
1.187161924505394E9     -99999999.9     -99999999.9     2023-03-11T07:40        
2022-11-29      2023-01-14T07:24:07     \N      D       
3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve
       {"car":"BMW","model":"X5","year":2020,"color":"black"}  true    1       
2       3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+81     2023-08-23      false   106     11492   -667795397      
4480250461471356146     -5346660566234294101    9082.75 3.85167225902608E8      
-99999999.9     99999999.9      2023-03-20T03:33:16     2022-11-24      
2023-02-16T18:29:41     G       9       
Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag 
{"flower":"rose","color":"red","fragrance":true}        true    1       2       
3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+85     2023-08-11      true    -7      24304   -2043877415     
-2024144417867729183    \N      5363.0244       -5.78615669042831E8     
-99999999.9     -99999999.9     2023-07-15T01:07:41     2023-08-13      
2023-01-20T11:57:48     i               WQ9dh9ajPu0y    
{"country":"France","capital":"Paris","population":67081000}    true    1       
2       3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+90     2023-08-27      true    22      16456   -1476824962     
-3279894870153540825    8990195191470116763     26651.906       
2.06860148942546E8      -99999999.9     -99999999.9     2022-10-07T03:11:03     
2023-03-18      2023-04-15T00:38:33     T       L       
QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD
 
{"company":"Apple","products":[{"name":"iPhone","price":1000},{"name":"MacBook","price":1500}]}
 true    1       2       3       4       5       6.0     7.0     888888888      
 999999999       2023-08-24      2023-08-24T12:00        2023-08-24      
2023-08-24T12:00        我能吞下玻璃而不伤身体      [...]
+91     2023-08-27      true    90      2465    702240964       
6373830997821598984     305860046137409400      15991.356       
1.599972327386147E9     -99999999.9     \N      2023-04-26T19:31:10     
2023-07-21      \N      2               
B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK
  {"fruit":"apple","color":"red","qty":5,"price":2.5}     true    1       2     
  3       4       5       6.0     7.0     888888888       999999999       
2023-08-24      2023-08-24T12:00        2023-08-24      2023-08-24T12:00        
我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     \N
+
diff --git a/regression-test/suites/load_p0/routine_load/data/test_schedule.csv 
b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv
new file mode 100644
index 00000000000..b58285ed575
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv
@@ -0,0 +1,20 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
 18:39:10|2023-02-12|2023-01-27 
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": 
"New York"}
+49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}
+66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24
 10:39:23|2022-09-24|2022-10-16 
18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book":
 {"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925}
+91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26
 
19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit":
 "apple", "color": "red", "qty": 5, "price": 2.5}
+80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11
 07:40:00|2022-11-29|2023-01-14 
07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car":
 "BMW", "model": "X5", "year": 2020, "color": "black"}
+85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15
 01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France", 
"capital": "Paris", "population": 67081000}
+31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07
 
03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team":
 "Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole 
Gunnar Solskjaer"}
+20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15
 21:40:55|2023-02-23|2023-08-13 
21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name":
 "Sarah", "age": 30, "city": "London", "isMarried": false}
+90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07
 03:11:03|2023-03-18|2023-04-15 
00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company":
 "Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook", 
"price": 1500}]}
+8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07
 14:13:19|2022-10-18|2023-07-16 
05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal":
 "lion", "weight": 200, "habitat": ["savannah", "grassland"]}
+65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14
 22:01:27|2023-05-19|2022-11-13 
13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language":
 "Python", "version": 3.9, "frameworks": ["Django", "Flask"]}
+62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04
 01:14:51|2022-09-17|2022-12-04 
19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username":
 "user123", "password": "pass123", "email": "[email protected]"}
+50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22
 02:03:21|2023-05-14|2023-03-25 
02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city":
 "Tokyo", "temperature": 20.5, "humidity": 75}
+58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02
 05:13:24|2022-09-18|2023-04-23 
10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant":
 "Pizza Hut", "menu": ["pizza", "pasta", "salad"]}
+60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29
 14:47:30|2022-09-24|2023-08-01 
12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game":
 "Chess", "players": 2, "time": "1 hour"}
+68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28
 20:26:51|2022-10-04|2023-07-30 
00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country":
 "Brazil", "continent": "South America", "population": 211049527}
+50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29
 02:27:20|2023-06-01|2023-08-12 
04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band":
 "The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison", 
"Ringo Starr"]}
+81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20
 03:33:16|2022-11-24|2023-02-16 
18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose", 
"color": "red", "fragrance": true}
+41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02
 17:56:44|2022-10-12|2023-02-19 
07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food":
 "Sushi", "price": 10, "restaurant": "Sushi King"}
+21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31
 10:56:14|2023-01-20|2023-02-18 
13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city":
 "Sydney", "population": 5312000, "area": 2058.7}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
new file mode 100644
index 00000000000..c8044ad1404
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+
+suite("test_routine_load_schedule","p0") {
+    def kafkaCsvTpoics = [
+                  "test_schedule",
+                ]
+
+    String enabled = context.config.otherConfigs.get("enableKafkaTest")
+    String kafka_port = context.config.otherConfigs.get("kafka_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // define kafka 
+        def props = new Properties()
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+        // Create kafka producer
+        def producer = new KafkaProducer<>(props)
+
+        for (String kafkaCsvTopic in kafkaCsvTpoics) {
+            def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+            def lines = txt.readLines()
+            lines.each { line ->
+                logger.info("=====${line}========")
+                def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+                producer.send(record)
+            }
+        }
+    }
+
+    sleep(10000)
+
+    def jobName = "testScheduleJob"
+    def tableName = "test_routine_load_schedule"
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName}
+            (
+                k00 INT             NOT NULL,
+                k01 DATE            NOT NULL,
+                k02 BOOLEAN         NULL,
+                k03 TINYINT         NULL,
+                k04 SMALLINT        NULL,
+                k05 INT             NULL,
+                k06 BIGINT          NULL,
+                k07 LARGEINT        NULL,
+                k08 FLOAT           NULL,
+                k09 DOUBLE          NULL,
+                k10 DECIMAL(9,1)    NULL,
+                k11 DECIMALV3(9,1)  NULL,
+                k12 DATETIME        NULL,
+                k13 DATEV2          NULL,
+                k14 DATETIMEV2      NULL,
+                k15 CHAR            NULL,
+                k16 VARCHAR         NULL,
+                k17 STRING          NULL,
+                k18 JSON            NULL,
+                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                kd02 TINYINT         NOT NULL DEFAULT "1",
+                kd03 SMALLINT        NOT NULL DEFAULT "2",
+                kd04 INT             NOT NULL DEFAULT "3",
+                kd05 BIGINT          NOT NULL DEFAULT "4",
+                kd06 LARGEINT        NOT NULL DEFAULT "5",
+                kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
+                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                kd18 JSON            NULL,
+                
+                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+                
+            )
+            DUPLICATE KEY(k00)
+            PARTITION BY RANGE(k01)
+            (
+                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+            )
+            DISTRIBUTED BY HASH(k00) BUCKETS 32
+            PROPERTIES (
+                "bloom_filter_columns"="k05",
+                "replication_num" = "1"
+            );
+            """
+            sql "sync"
+
+            sql """
+                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "test_schedule",
+                    "property.kafka_default_offsets" = "OFFSET_END"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                if (state != "RUNNING") {
+                    count++
+                    if (count > 300) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                break;
+            }
+
+            if (enabled != null && enabled.equalsIgnoreCase("true")) {
+                // define kafka 
+                def props = new Properties()
+                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+                // Create kafka producer
+                def producer = new KafkaProducer<>(props)
+
+                for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                    def txt = new 
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+                    def lines = txt.readLines()
+                    lines.each { line ->
+                        logger.info("=====${line}========")
+                        def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                        producer.send(record)
+                    }
+                }
+            }
+
+            sleep(5000)
+
+            count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                if (state != "RUNNING") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                log.info("reason of state changed: 
${res[0][11].toString()}".toString())
+                break;
+            }
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                log.info("progress: ${res[0][15].toString()}".toString())
+                log.info("lag: ${res[0][16].toString()}".toString())
+                def json = parseJson(res[0][14])
+                if (json.loadedRows.toString() != "20") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                break;
+            }
+            qt_sql "select * from ${tableName} order by k00, k01"
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to