This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 07ad038  [Feature][RoutineLoad] Support for consuming kafka from the 
point of time (#5832)
07ad038 is described below

commit 07ad03887094fd4b70c98d13ce79f6a7debb92ac
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat May 22 23:37:53 2021 +0800

    [Feature][RoutineLoad] Support for consuming kafka from the point of time 
(#5832)
    
    Support when creating a kafka routine load, start consumption from a 
specified point in time instead of a specific offset.
    eg:
    ```
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.kafka_default_offsets" = "2021-10-10 11:00:00"
    );
    
    or
    
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2",
        "kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 
12:00:00"
    );
    ```
    
    This PR also reconstructed the analysis method of properties when creating 
or altering
    routine load jobs, and unified the analysis process in the 
`RoutineLoadDataSourceProperties` class.
---
 be/src/runtime/routine_load/data_consumer.cpp      |  43 +++
 be/src/runtime/routine_load/data_consumer.h        |   4 +
 .../routine_load/routine_load_task_executor.cpp    |  50 ++-
 .../routine_load/routine_load_task_executor.h      |   5 +
 be/src/service/internal_service.cpp                |  43 ++-
 .../load-data/routine-load-manual.md               |  21 ++
 .../Data Manipulation/ROUTINE LOAD.md              |  25 ++
 .../load-data/routine-load-manual.md               |  20 ++
 .../Data Manipulation/ROUTINE LOAD.md              |  23 ++
 fe/fe-core/src/main/cup/sql_parser.cup             |   3 +-
 .../doris/analysis/AlterRoutineLoadStmt.java       |  15 +-
 .../doris/analysis/CreateRoutineLoadStmt.java      | 216 ++-----------
 .../analysis/RoutineLoadDataSourceProperties.java  | 309 ++++++++++++++++--
 .../org/apache/doris/common/util/KafkaUtil.java    |  74 +++++
 .../org/apache/doris/common/util/TimeUtils.java    |  16 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  72 ++++-
 .../doris/load/routineload/RoutineLoadJob.java     |  10 +-
 .../doris/load/routineload/RoutineLoadManager.java |   2 +-
 .../doris/analysis/AlterRoutineLoadStmtTest.java   |  22 +-
 .../RoutineLoadDataSourcePropertiesTest.java       | 347 +++++++++++++++++++++
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  20 +-
 .../persist/AlterRoutineLoadOperationLogTest.java  |  12 +-
 gensrc/proto/internal_service.proto                |  11 +
 23 files changed, 1086 insertions(+), 277 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 301ad24..ea712c9 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "gutil/strings/split.h"
 #include "runtime/small_file_mgr.h"
 #include "service/backend_options.h"
@@ -293,6 +294,48 @@ Status 
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
     return Status::OK();
 }
 
+// get offsets of each partition for times.
+// The input parameter "times" holds <partition, timestamps>
+// The output parameter "offsets" returns <partition, offsets>
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+// See librdkafka/rdkafkacpp.h##offsetsForTimes()
+Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>& times,
+        std::vector<PIntegerPair>* offsets) {
+    // create topic partition
+    std::vector<RdKafka::TopicPartition*> topic_partitions;
+    for (const auto& entry : times) {
+        RdKafka::TopicPartition* tp1 =
+                RdKafka::TopicPartition::create(_topic, entry.key(), 
entry.val());
+        topic_partitions.push_back(tp1);
+    }
+    // delete TopicPartition finally
+    Defer delete_tp{[&topic_partitions]() {
+        std::for_each(topic_partitions.begin(), topic_partitions.end(),
+                      [](RdKafka::TopicPartition* tp1) { delete tp1; });
+    }};
+
+    // get offsets for times
+    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 
5000);
+    if (err != RdKafka::ERR_NO_ERROR) {
+        std::stringstream ss;
+        ss << "failed to get offsets for times: " << RdKafka::err2str(err);
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+
+    for (const auto& topic_partition : topic_partitions) {
+        PIntegerPair pair;
+        pair.set_key(topic_partition->partition());
+        pair.set_val(topic_partition->offset());
+        offsets->push_back(pair);
+    }
+
+    return Status::OK();
+}
+
 Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
     std::unique_lock<std::mutex> l(_lock);
     if (!_init) {
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index f0c1c6e..fcbd017 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -77,6 +77,7 @@ protected:
     time_t _last_visit_time;
 };
 
+class PIntegerPair;
 class KafkaEventCb : public RdKafka::EventCb {
 public:
     void event_cb(RdKafka::Event& event) {
@@ -141,6 +142,9 @@ public:
 
     // get the partitions ids of the topic
     Status get_partition_meta(std::vector<int32_t>* partition_ids);
+    // get offsets for times
+    Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
+            std::vector<PIntegerPair>* offsets);
 
 private:
     std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 8bcbfa9..f8f9c71 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -63,15 +63,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
     _task_map.clear();
 }
 
-Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const 
PKafkaMetaProxyRequest& request,
-                                                         std::vector<int32_t>* 
partition_ids) {
-    DCHECK(request.has_kafka_info());
-
-    // This context is meaningless, just for unifing the interface
-    StreamLoadContext ctx(_exec_env);
-    ctx.load_type = TLoadType::ROUTINE_LOAD;
-    ctx.load_src_type = TLoadSourceType::KAFKA;
-    ctx.label = "NaN";
+// Create a temp StreamLoadContext and set some kafka connection info in it.
+// So that we can use this ctx to get kafka data consumer instance.
+Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& 
request,
+                                             StreamLoadContext* ctx) {
+    ctx->load_type = TLoadType::ROUTINE_LOAD;
+    ctx->load_src_type = TLoadSourceType::KAFKA;
+    ctx->label = "NaN";
 
     // convert PKafkaInfo to TKafkaLoadInfo
     TKafkaLoadInfo t_info;
@@ -84,8 +82,18 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
     }
     t_info.__set_properties(std::move(properties));
 
-    ctx.kafka_info.reset(new KafkaLoadInfo(t_info));
-    ctx.need_rollback = false;
+    ctx->kafka_info.reset(new KafkaLoadInfo(t_info));
+    ctx->need_rollback = false;
+    return Status::OK();
+}
+
+Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const 
PKafkaMetaProxyRequest& request,
+                                                         std::vector<int32_t>* 
partition_ids) {
+    CHECK(request.has_kafka_info());
+
+    // This context is meaningless, just for unifing the interface
+    StreamLoadContext ctx(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
 
     std::shared_ptr<DataConsumer> consumer;
     RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
@@ -98,6 +106,26 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
     return st;
 }
 
+Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const 
PKafkaMetaProxyRequest& request,
+        std::vector<PIntegerPair>* partition_offsets) {
+    CHECK(request.has_kafka_info());
+
+    // This context is meaningless, just for unifing the interface
+    StreamLoadContext ctx(_exec_env);
+    RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+
+    std::shared_ptr<DataConsumer> consumer;
+    RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+
+    Status st = 
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
+            std::vector<PIntegerPair>(request.offset_times().begin(), 
request.offset_times().end()),
+            partition_offsets);
+    if (st.ok()) {
+        _data_consumer_pool.return_consumer(consumer);
+    }
+    return st;
+}
+
 Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
     std::unique_lock<std::mutex> l(_lock);
     if (_task_map.find(task.id) != _task_map.end()) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index ffc8b2c..e5c7939 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -52,6 +52,9 @@ public:
     Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
                                     std::vector<int32_t>* partition_ids);
 
+    Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& 
request,
+        std::vector<PIntegerPair>* partition_offsets);
+
 private:
     // execute the task
     void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, 
ExecFinishCallback cb);
@@ -60,6 +63,8 @@ private:
 
     // for test only
     Status _execute_plan_for_test(StreamLoadContext* ctx);
+    // create a dummy StreamLoadContext for PKafkaMetaProxyRequest
+    Status _prepare_ctx(const PKafkaMetaProxyRequest& request, 
StreamLoadContext* ctx);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 62e917f..8023751 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -187,19 +187,42 @@ void 
PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
                                        const PProxyRequest* request, 
PProxyResult* response,
                                        google::protobuf::Closure* done) {
     brpc::ClosureGuard closure_guard(done);
+    // PProxyRequest is defined in gensrc/proto/internal_service.proto
+    // Currently it supports 2 kinds of requests:
+    // 1. get all kafka partition ids for given topic
+    // 2. get all kafka partition offsets for given topic and timestamp.
     if (request->has_kafka_meta_request()) {
-        std::vector<int32_t> partition_ids;
-        Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                request->kafka_meta_request(), &partition_ids);
-        if (st.ok()) {
-            PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-            for (int32_t id : partition_ids) {
-                kafka_result->add_partition_ids(id);
+        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+        if (!kafka_request.offset_times().empty()) {
+            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
+            std::vector<PIntegerPair> partition_offsets;
+            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+                    request->kafka_meta_request(), &partition_offsets);
+            if (st.ok()) {
+                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                for (const auto& entry : partition_offsets) {
+                    PIntegerPair* res = part_offsets->add_offset_times();
+                    res->set_key(entry.key());
+                    res->set_val(entry.val());
+                }
             }
+            st.to_protobuf(response->mutable_status());
+            return;
+        } else {
+            // get partition ids of topic
+            std::vector<int32_t> partition_ids;
+            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                    request->kafka_meta_request(), &partition_ids);
+            if (st.ok()) {
+                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                for (int32_t id : partition_ids) {
+                    kafka_result->add_partition_ids(id);
+                }
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
         }
-        st.to_protobuf(response->mutable_status());
-        return;
-    }
+    } 
     Status::OK().to_protobuf(response->mutable_status());
 }
 
diff --git a/docs/en/administrator-guide/load-data/routine-load-manual.md 
b/docs/en/administrator-guide/load-data/routine-load-manual.md
index 592ebe5..ab29901 100644
--- a/docs/en/administrator-guide/load-data/routine-load-manual.md
+++ b/docs/en/administrator-guide/load-data/routine-load-manual.md
@@ -273,11 +273,32 @@ The user can control the stop, pause and restart of the 
job by the three command
     * If the broker of the user kafka cluster has `auto.create.topics.enable = 
false` set, topic will not be created automatically, and the routine will be 
paused before any data is read, with the status `PAUSED`.
 
     So, if the user wants to be automatically created by the routine when the 
kafka topic does not exist, just set the broker in the kafka cluster** of the 
user's side to set auto.create.topics.enable = true` .
+    
 5. Problems that may occur in the some environment
      In some environments, there are isolation measures for network segment 
and domain name resolution. So should pay attention to:
         1. The broker list specified in the routine load task must be 
accessible on the doris environment. 
         2. If `advertised.listeners` is configured in kafka, The addresses in 
`advertised.listeners` need to be accessible on the doris environment.
 
+6. About specified Partition and Offset
+
+    Doris supports specifying Partition and Offset to start consumption. The 
new version also supports the consumption function at a specified time point. 
The configuration relationship of the corresponding parameters is explained 
here.
+    
+    There are three relevant parameters:
+    
+    * `kafka_partitions`: Specify the list of partitions to be consumed, such 
as: "0, 1, 2, 3".
+    * `kafka_offsets`: Specify the starting offset of each partition, which 
must correspond to the number of `kafka_partitions` lists. Such as: "1000, 
1000, 2000, 2000"
+    * `property.kafka_default_offset`: Specify the default starting offset of 
the partition.
+
+    When creating an routine load job, these three parameters can have the 
following combinations:
+    
+    | Combinations | `kafka_partitions` | `kafka_offsets` | 
`property.kafka_default_offset` | Behavior |
+    |---|---|---|---|---|
+    |1| No | No | No | The system will automatically find all the partitions 
corresponding to the topic and start consumption from OFFSET_END |
+    |2| No | No | Yes | The system will automatically find all the partitions 
corresponding to the topic and start consumption from the position specified by 
the default offset |
+    |3| Yes | No | No | The system will start consumption from the OFFSET_END 
of the specified partition |
+    |4| Yes | Yes | No | The system will start consumption from the specified 
offset of the specified partition |
+    |5| Yes | No | Yes | The system will start consumption from the specified 
partition and the location specified by the default offset |
+
 ## Related parameters
 
 Some system configuration parameters can affect the use of routine loads.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
index ea6487f..ee0e775 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
@@ -243,6 +243,9 @@ FROM data_source
 
         2) OFFSET_END: Subscribe from the end.
 
+        3) Timestamp, the format must be like: "2021-05-11 10:00:00", the 
system will automatically locate the offset of the first message greater than 
or equal to the timestamp.
+           Note that the offset of the timestamp format cannot be mixed with 
the number type, only one of them can be selected.
+
         If not specified, all partitions under topic are subscribed by default 
fromSET_END.
 
         Example:
@@ -250,6 +253,9 @@ FROM data_source
         ```
         "kafka_partitions" = "0,1,2,3",
         "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
+
+        "kafka_partitions" = "0,1",
+        "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
         ```
 
     4. property
@@ -305,9 +311,12 @@ FROM data_source
 
             2) OFFSET_END: Subscribe from the end.
 
+            3) Timestamp, the format is the same as kafka_offsets
+
             Example:
 
             `"property.kafka_default_offsets" = "OFFSET_BEGINNING"`
+            `"property.kafka_default_offsets" = "2021-05-11 10:00:00"`
 
 8. load data format sample
 
@@ -552,6 +561,22 @@ FROM data_source
             "kafka_offsets" = "101,0,0,200"
         );
 
+    9. Start consumption from the specified point in time
+
+       CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+       PROPERTIES
+       (
+           "desired_concurrent_number"="3",
+           "max_batch_interval" = "30",
+           "max_batch_rows" = "300000",
+           "max_batch_size" = "209715200"
+       ) FROM KAFKA
+       (
+           "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+           "kafka_topic" = "my_topic",
+           "property.kafka_default_offsets" = "2021-10-10 11:00:00"
+       );
+
 ## keyword
 
     CREATE, ROUTINE, LOAD
diff --git a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md 
b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
index 0a28012..976446b 100644
--- a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
@@ -277,6 +277,26 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
      1. 创建Routine load 任务中指定的 Broker list 必须能够被Doris服务访问
      2. Kafka 中如果配置了`advertised.listeners`, `advertised.listeners` 
中的地址必须能够被Doris服务访问
 
+6. 关于指定消费的 Partition 和 Offset
+
+    Doris 支持指定 Partition 和 Offset 开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。
+    
+    有三个相关参数:
+    
+    * `kafka_partitions`:指定待消费的 partition 列表,如:"0, 1, 2, 3"。
+    * `kafka_offsets`:指定每个分区的起始offset,必须和 `kafka_partitions` 列表个数对应。如:"1000, 
1000, 2000, 2000"
+    * `property.kafka_default_offset`:指定分区默认的起始offset。
+
+    在创建导入作业时,这三个参数可以有以下组合:
+    
+    | 组合 | `kafka_partitions` | `kafka_offsets` | 
`property.kafka_default_offset` | 行为 |
+    |---|---|---|---|---|
+    |1| No | No | No | 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费 |
+    |2| No | No | Yes | 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费|
+    |3| Yes | No | No | 系统会从指定分区的 OFFSET_END 开始消费 |
+    |4| Yes | Yes | No | 系统会从指定分区的指定offset 处开始消费 |
+    |5| Yes | No | Yes | 系统会从指定分区,default offset 指定的位置开始消费 |
+    
 ## 相关参数
 
 一些系统配置参数会影响例行导入的使用。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md
index 65fd54c..4200ad4 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
@@ -220,6 +220,7 @@ under the License.
                 offset 可以指定从大于等于 0 的具体 offset,或者:
                 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
                 2) OFFSET_END: 从末尾开始订阅。
+                3) 时间戳,格式必须如:"2021-05-11 
10:00:00",系统会自动定位到大于等于该时间戳的第一个消息的offset。注意,时间戳格式的offset不能和数字类型混用,只能选其一。
 
                 如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。
                 示例:
@@ -227,6 +228,9 @@ under the License.
                     "kafka_partitions" = "0,1,2,3",
                     "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
 
+                    "kafka_partitions" = "0,1",
+                    "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 
11:00:00"
+                    
             4. property
 
                 指定自定义kafka参数。
@@ -264,8 +268,11 @@ under the License.
                 值为
                     1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
                     2) OFFSET_END: 从末尾开始订阅。
+                    3) 时间戳,格式同 kafka_offsets
+
                     示例:
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    "property.kafka_default_offsets" = "2021-05-11 10:00:00"
 
     8. 导入数据格式样例
 
@@ -507,6 +514,22 @@ under the License.
             "kafka_offsets" = "101,0,0,200"
         );
 
+    9. 从指定的时间点开始消费
+
+        CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+        PROPERTIES
+        (
+            "desired_concurrent_number"="3",
+            "max_batch_interval" = "30",
+            "max_batch_rows" = "300000",
+            "max_batch_size" = "209715200"
+        ) FROM KAFKA
+        (
+            "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+            "kafka_topic" = "my_topic",
+            "property.kafka_default_offsets" = "2021-10-10 11:00:00"
+        );
+
 ## keyword
 
     CREATE,ROUTINE,LOAD
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index ea6c6d6..02aa5ed 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -803,7 +803,8 @@ opt_datasource_properties ::=
     :}
     | KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
     {:
-        RESULT = new RoutineLoadDataSourceProperties(type, customProperties);
+        // the 3rd parameter "true" means this is for AlterRoutineLoad 
operation.
+        RESULT = new RoutineLoadDataSourceProperties(type, customProperties, 
true);
     :}
     ;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 47fb380..0bd3f6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -17,11 +17,14 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.load.routineload.RoutineLoadJob;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -42,7 +45,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
 
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
 
-    private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = 
new ImmutableSet.Builder<String>()
+    private static final ImmutableSet<String> CONFIGURABLE_JOB_PROPERTIES_SET 
= new ImmutableSet.Builder<String>()
             .add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)
             .add(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)
             .add(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)
@@ -110,7 +113,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
 
     private void checkJobProperties() throws UserException {
         Optional<String> optional = jobProperties.keySet().stream().filter(
-                entity -> 
!CONFIGURABLE_PROPERTIES_SET.contains(entity)).findFirst();
+                entity -> 
!CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst();
         if (optional.isPresent()) {
             throw new AnalysisException(optional.get() + " is invalid 
property");
         }
@@ -194,7 +197,13 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         }
     }
 
-    private void checkDataSourceProperties() throws AnalysisException {
+    private void checkDataSourceProperties() throws UserException {
+        if (!FeConstants.runningUnitTest) {
+            RoutineLoadJob job = 
Catalog.getCurrentCatalog().getRoutineLoadManager().checkPrivAndGetJob(getDbName(),
 getLabel());
+            dataSourceProperties.setTimezone(job.getTimezone());
+        } else {
+            dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        }
         dataSourceProperties.analyze();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index a828b1a..ffd782a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -31,21 +31,20 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.load.RoutineLoadDesc;
 import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.load.routineload.KafkaProgress;
-import org.apache.doris.load.routineload.LoadDataSourceType;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
-import java.util.regex.Pattern;
 
 /*
  Create routine Load statement,  continually load data from a streaming app
@@ -87,6 +86,8 @@ import java.util.regex.Pattern;
           KAFKA
 */
 public class CreateRoutineLoadStmt extends DdlStmt {
+    private static final Logger LOG = 
LogManager.getLogger(CreateRoutineLoadStmt.class);
+
     // routine load properties
     public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = 
"desired_concurrent_number";
     // max error number in ten thousand records
@@ -111,9 +112,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
     public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";
     public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";
-
+    public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS = 
"kafka_origin_default_offsets";
+    
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
-    private static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
+    public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
 
     private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
@@ -132,19 +134,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(EXEC_MEM_LIMIT_PROPERTY)
             .build();
 
-    private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
-            .add(KAFKA_BROKER_LIST_PROPERTY)
-            .add(KAFKA_TOPIC_PROPERTY)
-            .add(KAFKA_PARTITIONS_PROPERTY)
-            .add(KAFKA_OFFSETS_PROPERTY)
-            .build();
-
     private final LabelName labelName;
     private final String tableName;
     private final List<ParseNode> loadPropertyList;
     private final Map<String, String> jobProperties;
     private final String typeName;
-    private final Map<String, String> dataSourceProperties;
+    private final RoutineLoadDataSourceProperties dataSourceProperties;
 
     // the following variables will be initialized after analyze
     // -1 as unset, the default value will set in RoutineLoadJob
@@ -165,29 +160,21 @@ public class CreateRoutineLoadStmt extends DdlStmt {
      *   1) dataFormat = "json"
      *   2) jsonPaths = "$.XXX.xxx"
      */
-    private String format     = ""; //default is csv.
-    private String jsonPaths  = "";
-    private String jsonRoot   = ""; // MUST be a jsonpath string
+    private String format = ""; //default is csv.
+    private String jsonPaths = "";
+    private String jsonRoot = ""; // MUST be a jsonpath string
     private boolean stripOuterArray = false;
     private boolean numAsString = false;
     private boolean fuzzyParse = false;
 
-    // kafka related properties
-    private String kafkaBrokerList;
-    private String kafkaTopic;
-    // pair<partition id, offset>
-    private List<Pair<Integer, Long>> kafkaPartitionOffsets = 
Lists.newArrayList();
-
-    // custom kafka property map<key, value>
-    private Map<String, String> customKafkaProperties = Maps.newHashMap();
     private LoadTask.MergeType mergeType;
 
-    public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) 
-> { return v > 0L; };
-    public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> { 
return v >= 0L; };
-    public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> { 
return v >= 5 && v <= 60; };
-    public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> { return 
v >= 200000; };
-    public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> { return 
v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; };
-    public static final Predicate<Long>  EXEC_MEM_LIMIT_PRED = (v) -> { return 
v >= 0L; };
+    public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) 
-> v > 0L;
+    public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L;
+    public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >= 
5 && v <= 60;
+    public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 
200000;
+    public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 
* 1024 * 1024 && v <= 1024 * 1024 * 1024;
+    public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L;
 
     public CreateRoutineLoadStmt(LabelName labelName, String tableName, 
List<ParseNode> loadPropertyList,
                                  Map<String, String> jobProperties, String 
typeName,
@@ -197,7 +184,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         this.loadPropertyList = loadPropertyList;
         this.jobProperties = jobProperties == null ? Maps.newHashMap() : 
jobProperties;
         this.typeName = typeName.toUpperCase();
-        this.dataSourceProperties = dataSourceProperties;
+        this.dataSourceProperties = new 
RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false);
         this.mergeType = mergeType;
     }
 
@@ -278,25 +265,29 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     }
 
     public String getKafkaBrokerList() {
-        return kafkaBrokerList;
+        return this.dataSourceProperties.getKafkaBrokerList();
     }
 
     public String getKafkaTopic() {
-        return kafkaTopic;
+        return this.dataSourceProperties.getKafkaTopic();
     }
 
     public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
-        return kafkaPartitionOffsets;
+        return this.dataSourceProperties.getKafkaPartitionOffsets();
     }
 
     public Map<String, String> getCustomKafkaProperties() {
-        return customKafkaProperties;
+        return this.dataSourceProperties.getCustomKafkaProperties();
     }
 
     public LoadTask.MergeType getMergeType() {
         return mergeType;
     }
 
+    public boolean isOffsetsForTimes() {
+        return this.dataSourceProperties.isOffsetsForTimes();
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
@@ -467,155 +458,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         }
     }
 
-    private void checkDataSourceProperties() throws AnalysisException {
-        LoadDataSourceType type;
-        try {
-            type = LoadDataSourceType.valueOf(typeName);
-        } catch (IllegalArgumentException e) {
-            throw new AnalysisException("routine load job does not support 
this type " + typeName);
-        }
-        switch (type) {
-            case KAFKA:
-                checkKafkaProperties();
-                break;
-            default:
-                break;
-        }
-    }
-
-    private void checkKafkaProperties() throws AnalysisException {
-        Optional<String> optional = dataSourceProperties.keySet().stream()
-                .filter(entity -> !KAFKA_PROPERTIES_SET.contains(entity))
-                .filter(entity -> !entity.startsWith("property.")).findFirst();
-        if (optional.isPresent()) {
-            throw new AnalysisException(optional.get() + " is invalid kafka 
custom property");
-        }
-
-        // check broker list
-        kafkaBrokerList = 
Strings.nullToEmpty(dataSourceProperties.get(KAFKA_BROKER_LIST_PROPERTY)).replaceAll("
 ", "");
-        if (Strings.isNullOrEmpty(kafkaBrokerList)) {
-            throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is a 
required property");
-        }
-        String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
-        for (String broker : kafkaBrokerList) {
-            if (!Pattern.matches(ENDPOINT_REGEX, broker)) {
-                throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" + 
broker
-                                                    + " not match pattern " + 
ENDPOINT_REGEX);
-            }
-        }
-
-        // check topic
-        kafkaTopic = 
Strings.nullToEmpty(dataSourceProperties.get(KAFKA_TOPIC_PROPERTY)).replaceAll("
 ", "");
-        if (Strings.isNullOrEmpty(kafkaTopic)) {
-            throw new AnalysisException(KAFKA_TOPIC_PROPERTY + " is a required 
property");
-        }
-
-        // check partitions
-        String kafkaPartitionsString = 
dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY);
-        if (kafkaPartitionsString != null) {
-            analyzeKafkaPartitionProperty(kafkaPartitionsString, 
this.kafkaPartitionOffsets);
-        }
-
-        // check offset
-        String kafkaOffsetsString = 
dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY);
-        if (kafkaOffsetsString != null) {
-            analyzeKafkaOffsetProperty(kafkaOffsetsString, 
this.kafkaPartitionOffsets);
-        }
-
-        // check custom kafka property
-        analyzeCustomProperties(this.dataSourceProperties, 
this.customKafkaProperties);
-    }
-
-    public static void analyzeKafkaPartitionProperty(String 
kafkaPartitionsString,
-            List<Pair<Integer, Long>> kafkaPartitionOffsets) throws 
AnalysisException {
-        kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
-        if (kafkaPartitionsString.isEmpty()) {
-            throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could 
not be a empty string");
-        }
-        String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
-        for (String s : kafkaPartitionsStringList) {
-            try {
-                
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, 
KAFKA_PARTITIONS_PROPERTY),
-                        KafkaProgress.OFFSET_END_VAL));
-            } catch (AnalysisException e) {
-                throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY
-                        + " must be a number string with comma-separated");
-            }
-        }
-    }
-
-    public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString,
-            List<Pair<Integer, Long>> kafkaPartitionOffsets) throws 
AnalysisException {
-        kafkaOffsetsString = kafkaOffsetsString.replaceAll(" ", "");
-        if (kafkaOffsetsString.isEmpty()) {
-            throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not 
be a empty string");
-        }
-        String[] kafkaOffsetsStringList = kafkaOffsetsString.split(",");
-        if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) {
-            throw new AnalysisException("Partitions number should be equals to 
offsets number");
-        }
-
-        for (int i = 0; i < kafkaOffsetsStringList.length; i++) {
-            // defined in librdkafka/rdkafkacpp.h
-            // OFFSET_BEGINNING: -2
-            // OFFSET_END: -1
-            try {
-                kafkaPartitionOffsets.get(i).second = 
getLongValueFromString(kafkaOffsetsStringList[i],
-                        KAFKA_OFFSETS_PROPERTY);
-                if (kafkaPartitionOffsets.get(i).second < 0) {
-                    throw new AnalysisException("Can not specify offset 
smaller than 0");
-                }
-            } catch (AnalysisException e) {
-                if 
(kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
-                    kafkaPartitionOffsets.get(i).second = 
KafkaProgress.OFFSET_BEGINNING_VAL;
-                } else if 
(kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
-                    kafkaPartitionOffsets.get(i).second = 
KafkaProgress.OFFSET_END_VAL;
-                } else {
-                    throw e;
-                }
-            }
-        }
-    }
-
-    public static void analyzeCustomProperties(Map<String, String> 
dataSourceProperties,
-            Map<String, String> customKafkaProperties) throws 
AnalysisException {
-        for (Map.Entry<String, String> dataSourceProperty : 
dataSourceProperties.entrySet()) {
-            if (dataSourceProperty.getKey().startsWith("property.")) {
-                String propertyKey = dataSourceProperty.getKey();
-                String propertyValue = dataSourceProperty.getValue();
-                String propertyValueArr[] = propertyKey.split("\\.");
-                if (propertyValueArr.length < 2) {
-                    throw new AnalysisException("kafka property value could 
not be a empty string");
-                }
-                
customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), 
propertyValue);
-            }
-            // can be extended in the future which other prefix
-        }
-    }
-
-    private static int getIntegerValueFromString(String valueString, String 
propertyName) throws AnalysisException {
-        if (valueString.isEmpty()) {
-            throw new AnalysisException(propertyName + " could not be a empty 
string");
-        }
-        int value;
-        try {
-            value = Integer.valueOf(valueString);
-        } catch (NumberFormatException e) {
-            throw new AnalysisException(propertyName + " must be a integer");
-        }
-        return value;
-    }
-
-    private static long getLongValueFromString(String valueString, String 
propertyName) throws AnalysisException {
-        if (valueString.isEmpty()) {
-            throw new AnalysisException(propertyName + " could not be a empty 
string");
-        }
-        long value;
-        try {
-            value = Long.valueOf(valueString);
-        } catch (NumberFormatException e) {
-            throw new AnalysisException(propertyName + " must be a integer: " 
+ valueString);
-        }
-        return value;
+    private void checkDataSourceProperties() throws UserException {
+        this.dataSourceProperties.setTimezone(this.timezone);
+        this.dataSourceProperties.analyze();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
index 1743b2b..ca6f800 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
@@ -19,42 +19,78 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.routineload.KafkaProgress;
 import org.apache.doris.load.routineload.LoadDataSourceType;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 
+import org.apache.commons.lang3.math.NumberUtils;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
 
 public class RoutineLoadDataSourceProperties {
-    private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = 
new ImmutableSet.Builder<String>()
+
+    private static final ImmutableSet<String> DATA_SOURCE_PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
+            .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
+            .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
             .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
             .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
+            .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
             .build();
-    
-    @SerializedName(value = "type")
-    private String type = "KAFKA";
+
+    private static final ImmutableSet<String> 
CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+            .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
+            .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
+            .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
+            .build();
+
     // origin properties, no need to persist
     private Map<String, String> properties = Maps.newHashMap();
+    private boolean isAlter = false;
+
+    @SerializedName(value = "type")
+    private String type = "KAFKA";
     @SerializedName(value = "kafkaPartitionOffsets")
     private List<Pair<Integer, Long>> kafkaPartitionOffsets = 
Lists.newArrayList();
     @SerializedName(value = "customKafkaProperties")
     private Map<String, String> customKafkaProperties = Maps.newHashMap();
+    @SerializedName(value = "isOffsetsForTimes")
+    private boolean isOffsetsForTimes = false;
+    @SerializedName(value = "kafkaBrokerList")
+    private String kafkaBrokerList;
+    @SerializedName(value = "KafkaTopic")
+    private String kafkaTopic;
+    @SerializedName(value = "timezone")
+    private String timezone;
 
     public RoutineLoadDataSourceProperties() {
-        // empty
+        // for unit test, and empty data source properties when altering 
routine load
+        this.isAlter = true;
     }
 
-    public RoutineLoadDataSourceProperties(String type, Map<String, String> 
properties) {
+    public RoutineLoadDataSourceProperties(String type, Map<String, String> 
properties, boolean isAlter) {
         this.type = type.toUpperCase();
         this.properties = properties;
+        this.isAlter = isAlter;
     }
 
-    public void analyze() throws AnalysisException {
+    public void analyze() throws UserException {
+        if (properties.isEmpty()) {
+            throw new AnalysisException("No properties");
+        }
+        Preconditions.checkState(!Strings.isNullOrEmpty(timezone), "timezone 
must be set before analyzing");
         checkDataSourceProperties();
     }
 
@@ -70,11 +106,31 @@ public class RoutineLoadDataSourceProperties {
         return kafkaPartitionOffsets;
     }
 
+    public void setKafkaPartitionOffsets(List<Pair<Integer, Long>> 
kafkaPartitionOffsets) {
+        this.kafkaPartitionOffsets = kafkaPartitionOffsets;
+    }
+
     public Map<String, String> getCustomKafkaProperties() {
         return customKafkaProperties;
     }
 
-    private void checkDataSourceProperties() throws AnalysisException {
+    public void setTimezone(String timezone) {
+        this.timezone = timezone;
+    }
+
+    public String getKafkaBrokerList() {
+        return kafkaBrokerList;
+    }
+
+    public String getKafkaTopic() {
+        return kafkaTopic;
+    }
+
+    public boolean isOffsetsForTimes() {
+        return isOffsetsForTimes;
+    }
+
+    private void checkDataSourceProperties() throws UserException {
         LoadDataSourceType sourceType;
         try {
             sourceType = LoadDataSourceType.valueOf(type);
@@ -90,37 +146,242 @@ public class RoutineLoadDataSourceProperties {
         }
     }
 
-    private void checkKafkaProperties() throws AnalysisException {
+    /*
+     * Kafka properties includes follows:
+     * 1. broker list
+     * 2. topic
+     * 3. partition offset info
+     * 4. other properties start with "property."
+     */
+    private void checkKafkaProperties() throws UserException {
+        ImmutableSet<String> propertySet = isAlter ? 
CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET : DATA_SOURCE_PROPERTIES_SET;
         Optional<String> optional = properties.keySet().stream().filter(
-                entity -> 
!CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter(
-                        entity -> !entity.startsWith("property.")).findFirst();
+                entity -> !propertySet.contains(entity)).filter(
+                entity -> !entity.startsWith("property.")).findFirst();
         if (optional.isPresent()) {
-            throw new AnalysisException(optional.get() + " is invalid kafka 
custom property");
+            throw new AnalysisException(optional.get() + " is invalid kafka 
property or can not be set");
+        }
+
+        // check broker list
+        kafkaBrokerList = 
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)).replaceAll("
 ", "");
+        if (!isAlter && Strings.isNullOrEmpty(kafkaBrokerList)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + " is a 
required property");
+        }
+        if (!Strings.isNullOrEmpty(kafkaBrokerList)) {
+            String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
+            for (String broker : kafkaBrokerList) {
+                if (!Pattern.matches(CreateRoutineLoadStmt.ENDPOINT_REGEX, 
broker)) {
+                    throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + ":" + 
broker
+                            + " not match pattern " + 
CreateRoutineLoadStmt.ENDPOINT_REGEX);
+                }
+            }
+        }
+
+        // check topic
+        kafkaTopic = 
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)).replaceAll("
 ", "");
+        if (!isAlter && Strings.isNullOrEmpty(kafkaTopic)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY + " is a required 
property");
         }
 
+        // check custom kafka property
+        // This should be done before check partition and offsets, because we 
need KAFKA_DEFAULT_OFFSETS,
+        // which is in custom properties.
+        analyzeCustomProperties(this.properties, this.customKafkaProperties);
+
+        // The partition offset properties are all optional,
+        // and there are 5 valid cases for specifying partition offsets:
+        // A. partition, offset and default offset are not set
+        //      Doris will set default offset to OFFSET_END
+        // B. partition and offset are set, default offset is not set
+        //      fill the "kafkaPartitionOffsets" with partition and offset
+        // C. partition and default offset are set, offset is not set
+        //      fill the "kafkaPartitionOffsets" with partition and default 
offset
+        // D. partition is set, offset and default offset are not set
+        //      this is only valid when doing create routine load operation,
+        //      fill the "kafkaPartitionOffsets" with partition and OFFSET_END
+        // E. only default offset is set.
+        //      this is only valid when doing alter routine load operation.
+        // Other cases are illegal.
+
         // check partitions
-        final String kafkaPartitionsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
+        String kafkaPartitionsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
         if (kafkaPartitionsString != null) {
+            analyzeKafkaPartitionProperty(kafkaPartitionsString, 
this.kafkaPartitionOffsets);
+        }
 
-            if 
(!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
-                throw new AnalysisException("Partition and offset must be 
specified at the same time");
+        // check offset
+        String kafkaOffsetsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
+        String kafkaDefaultOffsetString = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+        if (kafkaOffsetsString != null && kafkaDefaultOffsetString != null) {
+            throw new AnalysisException("Only one of " + 
CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY +
+                    " and " + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " 
can be set.");
+        }
+        if (isAlter && kafkaPartitionsString != null && kafkaOffsetsString == 
null && kafkaDefaultOffsetString == null) {
+            // if this is an alter operation, the partition and 
(default)offset must be set together.
+            throw new AnalysisException("Must set offset or default offset 
with partition property");
+        }
+
+        if (kafkaOffsetsString != null) {
+            this.isOffsetsForTimes = 
analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets, 
this.timezone);
+        } else {
+            // offset is not set, check default offset.
+            this.isOffsetsForTimes = 
analyzeKafkaDefaultOffsetProperty(this.customKafkaProperties, this.timezone);
+            if (!this.kafkaPartitionOffsets.isEmpty()) {
+                // Case C
+                kafkaDefaultOffsetString = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+                setDefaultOffsetForPartition(this.kafkaPartitionOffsets, 
kafkaDefaultOffsetString, this.isOffsetsForTimes);
             }
+        }
+    }
 
-            
CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, 
kafkaPartitionOffsets);
+    private static void setDefaultOffsetForPartition(List<Pair<Integer, Long>> 
kafkaPartitionOffsets,
+                                                     String 
kafkaDefaultOffsetString, boolean isOffsetsForTimes) {
+        if (isOffsetsForTimes) {
+            for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+                pair.second = Long.valueOf(kafkaDefaultOffsetString);
+            }
         } else {
-            if 
(properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
-                throw new AnalysisException("Missing kafka partition info");
+            for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+                if 
(kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+                    pair.second = KafkaProgress.OFFSET_BEGINNING_VAL;
+                } else {
+                    pair.second = KafkaProgress.OFFSET_END_VAL;
+                }
             }
         }
+    }
 
-        // check offset
-        String kafkaOffsetsString = 
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
-        if (kafkaOffsetsString != null) {
-            
CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString, 
kafkaPartitionOffsets);
+    // If the default offset is not set, set the default offset to OFFSET_END.
+    // If the offset is in datetime format, convert it to a timestamp, and 
also save the origin datatime formatted offset
+    // in "customKafkaProperties"
+    // return true if the offset is in datetime format.
+    private static boolean analyzeKafkaDefaultOffsetProperty(Map<String, 
String> customKafkaProperties, String timeZoneStr)
+            throws AnalysisException {
+        
customKafkaProperties.putIfAbsent(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, 
KafkaProgress.OFFSET_END);
+        String defaultOffsetStr = 
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+        TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr);
+        long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, 
timeZone);
+        if (defaultOffset != -1) {
+            // this is a datetime format offset
+            
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, 
String.valueOf(defaultOffset));
+            // we convert datetime to timestamp, and save the origin datetime 
formatted offset for further use.
+            
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS, 
defaultOffsetStr);
+            return true;
+        } else {
+            if 
(!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) && 
!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+                throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can only be 
set to OFFSET_BEGINNING, OFFSET_END or date time");
+            }
+            return false;
+        }
+    }
+
+    // init "kafkaPartitionOffsets" with partition property.
+    // The offset will be set to OFFSET_END for now, and will be changed in 
later analysis process.
+    private static void analyzeKafkaPartitionProperty(String 
kafkaPartitionsString,
+                                                      List<Pair<Integer, 
Long>> kafkaPartitionOffsets) throws AnalysisException {
+        kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
+        if (kafkaPartitionsString.isEmpty()) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY + " could not 
be a empty string");
+        }
+        String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
+        for (String s : kafkaPartitionsStringList) {
+            try {
+                
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, 
CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY),
+                        KafkaProgress.OFFSET_END_VAL));
+            } catch (AnalysisException e) {
+                throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY
+                        + " must be a number string with comma-separated");
+            }
+        }
+    }
+
+    // Fill the partition's offset with given kafkaOffsetsString,
+    // Return true if offset is specified by timestamp.
+    private static boolean analyzeKafkaOffsetProperty(String 
kafkaOffsetsString, List<Pair<Integer, Long>> kafkaPartitionOffsets,
+                                                      String timeZoneStr)
+            throws UserException {
+        if (Strings.isNullOrEmpty(kafkaOffsetsString)) {
+            throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " could not be 
a empty string");
+        }
+        List<String> kafkaOffsetsStringList = 
Splitter.on(",").trimResults().splitToList(kafkaOffsetsString);
+        if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) {
+            throw new AnalysisException("Partitions number should be equals to 
offsets number");
         }
 
-        // check custom properties
-        CreateRoutineLoadStmt.analyzeCustomProperties(properties, 
customKafkaProperties);
+        // We support two ways to specify the offset,
+        // one is to specify the offset directly, the other is to specify a 
timestamp.
+        // Doris will get the offset of the corresponding partition through 
the timestamp.
+        // The user can only choose one of these methods.
+        boolean foundTime = false;
+        boolean foundOffset = false;
+        for (String kafkaOffsetsStr : kafkaOffsetsStringList) {
+            if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) {
+                foundTime = true;
+            } else {
+                foundOffset = true;
+            }
+        }
+        if (foundTime && foundOffset) {
+            throw new AnalysisException("The offset of the partition cannot be 
specified by the timestamp " +
+                    "and the offset at the same time");
+        }
+
+        if (foundTime) {
+            // convert all datetime strs to timestamps
+            // and set them as the partition's offset.
+            // These timestamps will be converted to real offset when job is 
running.
+            TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr);
+            for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
+                String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
+                long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr, 
timeZone);
+                Preconditions.checkState(timestamp != -1);
+                kafkaPartitionOffsets.get(i).second = timestamp;
+            }
+        } else {
+            for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
+                String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
+                if 
(kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+                    kafkaPartitionOffsets.get(i).second = 
KafkaProgress.OFFSET_BEGINNING_VAL;
+                } else if 
(kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+                    kafkaPartitionOffsets.get(i).second = 
KafkaProgress.OFFSET_END_VAL;
+                } else if (NumberUtils.isDigits(kafkaOffsetsStr)) {
+                    kafkaPartitionOffsets.get(i).second = 
Long.valueOf(NumberUtils.toLong(kafkaOffsetsStr));
+                } else {
+                    throw new 
AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " must be an 
integer or a date time");
+                }
+            }
+        }
+
+        return foundTime;
+    }
+
+    private static void analyzeCustomProperties(Map<String, String> 
dataSourceProperties,
+                                                Map<String, String> 
customKafkaProperties) throws AnalysisException {
+        for (Map.Entry<String, String> dataSourceProperty : 
dataSourceProperties.entrySet()) {
+            if (dataSourceProperty.getKey().startsWith("property.")) {
+                String propertyKey = dataSourceProperty.getKey();
+                String propertyValue = dataSourceProperty.getValue();
+                String propertyValueArr[] = propertyKey.split("\\.");
+                if (propertyValueArr.length < 2) {
+                    throw new AnalysisException("kafka property value could 
not be a empty string");
+                }
+                
customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), 
propertyValue);
+            }
+            // can be extended in the future which other prefix
+        }
+    }
+
+    private static int getIntegerValueFromString(String valueString, String 
propertyName) throws AnalysisException {
+        if (valueString.isEmpty()) {
+            throw new AnalysisException(propertyName + " could not be a empty 
string");
+        }
+        int value;
+        try {
+            value = Integer.valueOf(valueString);
+        } catch (NumberFormatException e) {
+            throw new AnalysisException(propertyName + " must be a integer");
+        }
+        return value;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 4ecb520..6691af6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.rpc.BackendServiceProxy;
@@ -28,6 +29,8 @@ import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.collect.Lists;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -93,5 +96,76 @@ public class KafkaUtil {
             }
         }
     }
+
+    // Get offsets by times.
+    // The input parameter "timestampOffsets" is <partition, timestamp>
+    // Tne return value is <partition, offset>
+    public static List<Pair<Integer, Long>> getOffsetsForTimes(String 
brokerList, String topic,
+                                                               Map<String, 
String> convertedCustomProperties,
+                                                               
List<Pair<Integer, Long>> timestampOffsets) throws LoadException {
+        BackendService.Client client = null;
+        TNetworkAddress address = null;
+        LOG.debug("begin to get offsets for times of topic: {}, {}", topic, 
timestampOffsets);
+        boolean ok = false;
+        try {
+            List<Long> backendIds = 
Catalog.getCurrentSystemInfo().getBackendIds(true);
+            if (backendIds.isEmpty()) {
+                throw new LoadException("Failed to get offset for times. No 
alive backends");
+            }
+            Collections.shuffle(backendIds);
+            Backend be = 
Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+            // create request
+            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
+                    InternalService.PKafkaMetaProxyRequest.newBuilder()
+                            
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+                                    .setBrokers(brokerList)
+                                    .setTopic(topic)
+                                    .addAllProperties(
+                                            
convertedCustomProperties.entrySet().stream().map(
+                                                    e -> 
InternalService.PStringPair.newBuilder()
+                                                            .setKey(e.getKey())
+                                                            
.setVal(e.getValue())
+                                                            .build()
+                                            ).collect(Collectors.toList())
+                                    )
+                            );
+            for (Pair<Integer, Long> pair : timestampOffsets) {
+                
metaRequestBuilder.addOffsetTimes(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
+                        .setVal(pair.second).build());
+            }
+
+            InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+                    metaRequestBuilder).build();
+
+            // get info
+            Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
+            InternalService.PProxyResult result = future.get(5, 
TimeUnit.SECONDS);
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new UserException("failed to get kafka partition info: " 
+ result.getStatus().getErrorMsgsList());
+            } else {
+                List<InternalService.PIntegerPair> pairs = 
result.getPartitionOffsets().getOffsetTimesList();
+                List<Pair<Integer, Long>> partitionOffsets = 
Lists.newArrayList();
+                for (InternalService.PIntegerPair pair : pairs) {
+                    partitionOffsets.add(Pair.create(pair.getKey(), 
pair.getVal()));
+                }
+                LOG.debug("finish to get offsets for times of topic: {}, {}", 
topic, partitionOffsets);
+                return partitionOffsets;
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to get offsets for times.", e);
+            throw new LoadException(
+                    "Failed to get offsets for times of kafka topic: " + topic 
+ ". error: " + e.getMessage());
+        } finally {
+            if (ok) {
+                ClientPool.backendPool.returnObject(address, client);
+            } else {
+                ClientPool.backendPool.invalidateObject(address, client);
+            }
+        }
+    }
 }
 
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index 85901be..4feded8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -233,10 +233,6 @@ public class TimeUtils {
         }
     }
 
-    public static long dateTransform(long time, Type type) {
-        return dateTransform(time, type.getPrimitiveType());
-    }
-
     public static long timeStringToLong(String timeStr) {
         Date d;
         try {
@@ -247,6 +243,18 @@ public class TimeUtils {
         return d.getTime();
     }
 
+    public static long timeStringToLong(String timeStr, TimeZone timeZone) {
+        SimpleDateFormat dateFormatTimeZone = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
+        dateFormatTimeZone.setTimeZone(timeZone);
+        Date d;
+        try {
+            d = dateFormatTimeZone.parse(timeStr);
+        } catch (ParseException e) {
+            return -1;
+        }
+        return d.getTime();
+    }
+
     // Check if the time zone_value is valid
     public static String checkTimeZoneValidAndStandardize(String value) throws 
DdlException {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 856ae0d..e667515 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -40,14 +40,12 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.SmallFileMgr;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -55,6 +53,9 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -62,6 +63,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.UUID;
 
 /**
@@ -80,6 +82,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     // current kafka partitions is the actually partition which will be fetched
     private List<Integer> currentKafkaPartitions = Lists.newArrayList();
     // optional, user want to set default offset when new partition add or 
offset not set.
+    // kafkaDefaultOffSet has two formats, one is the time format, eg: 
"2021-10-10 11:00:00",
+    // the other is string value, including OFFSET_END and OFFSET_BEGINNING.
+    // We should check it by calling isOffsetForTimes() method before use it.
     private String kafkaDefaultOffSet = "";
     // kafka properties ,property prefix will be mapped to kafka custom 
parameters, which can be extended in the future
     private Map<String, String> customProperties = Maps.newHashMap();
@@ -110,6 +115,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return convertedCustomProperties;
     }
 
+    private boolean isOffsetForTimes() {
+        long offset = TimeUtils.timeStringToLong(this.kafkaDefaultOffSet);
+        return offset != -1;
+    }
+
+    private long convertedDefaultOffsetToTimestamp() {
+        TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone());
+        return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, timeZone);
+    }
+
     @Override
     public void prepare() throws UserException {
         super.prepare();
@@ -142,7 +157,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 convertedCustomProperties.put(entry.getKey(), 
entry.getValue());
             }
         }
-        if 
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS))
 {
+
+        // This is mainly for compatibility. In the previous version, we 
directly obtained the value of the
+        // KAFKA_DEFAULT_OFFSETS attribute. In the new version, we support 
date time as the value of KAFKA_DEFAULT_OFFSETS,
+        // and this attribute will be converted into a timestamp during the 
analyzing phase, thus losing some information.
+        // So we use KAFKA_ORIGIN_DEFAULT_OFFSETS to store the original 
datetime formatted KAFKA_DEFAULT_OFFSETS value
+        if 
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS))
 {
+            kafkaDefaultOffSet = 
convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS);
+        } else if 
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS))
 {
             kafkaDefaultOffSet = 
convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
         }
     }
@@ -254,7 +276,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    protected void unprotectUpdateProgress() {
+    protected void unprotectUpdateProgress() throws UserException {
         updateNewPartitionProgress();
     }
 
@@ -389,14 +411,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
-    private void updateNewPartitionProgress() {
+    private void updateNewPartitionProgress() throws LoadException {
         // update the progress of new partitions
         for (Integer kafkaPartition : currentKafkaPartitions) {
             if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) 
{
                 // if offset is not assigned, start from OFFSET_END
                 long beginOffSet = KafkaProgress.OFFSET_END_VAL;
                 if (!kafkaDefaultOffSet.isEmpty()) {
-                    if 
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+                    if (isOffsetForTimes()) {
+                        // get offset by time
+                        List<Pair<Integer, Long>> offsets = 
Lists.newArrayList();
+                        offsets.add(Pair.create(kafkaPartition, 
convertedDefaultOffsetToTimestamp()));
+                        offsets = 
KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, 
convertedCustomProperties, offsets);
+                        Preconditions.checkState(offsets.size() == 1);
+                        beginOffSet = offsets.get(0).second;
+                    } else if 
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
                         beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
                     } else if 
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
                         beginOffSet = KafkaProgress.OFFSET_END_VAL;
@@ -420,7 +449,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         super.setOptional(stmt);
 
         if (!stmt.getKafkaPartitionOffsets().isEmpty()) {
-            setCustomKafkaPartitions(stmt.getKafkaPartitionOffsets());
+            setCustomKafkaPartitions(stmt);
         }
         if (!stmt.getCustomKafkaProperties().isEmpty()) {
             setCustomKafkaProperties(stmt.getCustomKafkaProperties());
@@ -428,7 +457,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     // this is a unprotected method which is called in the initialization 
function
-    private void setCustomKafkaPartitions(List<Pair<Integer, Long>> 
kafkaPartitionOffsets) throws LoadException {
+    private void setCustomKafkaPartitions(CreateRoutineLoadStmt stmt) throws 
LoadException {
+        List<Pair<Integer, Long>> kafkaPartitionOffsets = 
stmt.getKafkaPartitionOffsets();
+        boolean isForTimes = stmt.isOffsetsForTimes();
+        if (isForTimes) {
+            // the offset is set by date time, we need to get the real offset 
by time
+            kafkaPartitionOffsets = 
KafkaUtil.getOffsetsForTimes(stmt.getKafkaBrokerList(), stmt.getKafkaTopic(),
+                    convertedCustomProperties, 
stmt.getKafkaPartitionOffsets());
+        }
+
         for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
             this.customKafkaPartitions.add(partitionOffset.first);
             ((KafkaProgress) progress).addPartitionOffset(partitionOffset);
@@ -497,9 +534,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     @Override
-    public void modifyProperties(AlterRoutineLoadStmt stmt) throws 
DdlException {
+    public void modifyProperties(AlterRoutineLoadStmt stmt) throws 
UserException {
         Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
         RoutineLoadDataSourceProperties dataSourceProperties = 
stmt.getDataSourceProperties();
+        if (dataSourceProperties.isOffsetsForTimes()) {
+            // if the partition offset is set by timestamp, convert it to real 
offset
+            convertTimestampToOffset(dataSourceProperties);
+        }
 
         writeLock();
         try {
@@ -507,6 +548,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 throw new DdlException("Only supports modification of PAUSED 
jobs");
             }
 
+
             modifyPropertiesInternal(jobProperties, dataSourceProperties);
 
             AlterRoutineLoadJobOperationLog log = new 
AlterRoutineLoadJobOperationLog(this.id,
@@ -517,6 +559,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
+    private void convertTimestampToOffset(RoutineLoadDataSourceProperties 
dataSourceProperties) throws UserException {
+        List<Pair<Integer, Long>> partitionOffsets = 
dataSourceProperties.getKafkaPartitionOffsets();
+        if (partitionOffsets.isEmpty()) {
+            return;
+        }
+        List<Pair<Integer, Long>> newOffsets = 
KafkaUtil.getOffsetsForTimes(brokerList, topic,
+                convertedCustomProperties, partitionOffsets);
+        dataSourceProperties.setKafkaPartitionOffsets(newOffsets);
+    }
+
     private void modifyPropertiesInternal(Map<String, String> jobProperties,
                                           RoutineLoadDataSourceProperties 
dataSourceProperties)
             throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 96ddf79..3ca3825 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -67,9 +67,6 @@ import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -79,6 +76,9 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -1269,7 +1269,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         }
     }
 
-    protected void unprotectUpdateProgress() {
+    protected void unprotectUpdateProgress() throws UserException {
     }
 
     protected boolean unprotectNeedReschedule() throws UserException {
@@ -1544,7 +1544,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         }
     }
 
-    abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws 
DdlException;
+    abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws 
UserException;
 
     abstract public void 
replayModifyProperties(AlterRoutineLoadJobOperationLog log);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 682d3a1..d89bde7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -205,7 +205,7 @@ public class RoutineLoadManager implements Writable {
         return false;
     }
 
-    private RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
+    public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
             throws MetaNotFoundException, DdlException, AnalysisException {
         RoutineLoadJob routineLoadJob = getJob(dbName, jobName);
         if (routineLoadJob == null) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
index 4246b30..f9ddb688 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -49,7 +50,7 @@ public class AlterRoutineLoadStmtTest {
     @Before
     public void setUp() {
         analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
-
+        FeConstants.runningUnitTest = true;
         new Expectations() {
             {
                 auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate) 
any);
@@ -80,7 +81,7 @@ public class AlterRoutineLoadStmtTest {
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, 
"1,2,3");
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 
20000, 30000");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
             try {
@@ -131,7 +132,7 @@ public class AlterRoutineLoadStmtTest {
             Map<String, String> dataSourceProperties = Maps.newHashMap();
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, 
"new_topic");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
 
@@ -139,8 +140,9 @@ public class AlterRoutineLoadStmtTest {
                 stmt.analyze(analyzer);
                 Assert.fail();
             } catch (AnalysisException e) {
-                Assert.assertTrue(e.getMessage().contains("kafka_topic is 
invalid kafka custom property"));
+                Assert.assertTrue(e.getMessage().contains("kafka_topic is 
invalid kafka property"));
             } catch (UserException e) {
+                e.printStackTrace();
                 Assert.fail();
             }
         }
@@ -152,14 +154,14 @@ public class AlterRoutineLoadStmtTest {
             Map<String, String> dataSourceProperties = Maps.newHashMap();
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, 
"1,2,3");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
             try {
                 stmt.analyze(analyzer);
                 Assert.fail();
             } catch (AnalysisException e) {
-                Assert.assertTrue(e.getMessage().contains("Partition and 
offset must be specified at the same time"));
+                Assert.assertTrue(e.getMessage().contains("Must set offset or 
default offset with partition property"));
             } catch (UserException e) {
                 Assert.fail();
             }
@@ -173,7 +175,7 @@ public class AlterRoutineLoadStmtTest {
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, 
"1,2,3");
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 
2000");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
             try {
@@ -193,14 +195,14 @@ public class AlterRoutineLoadStmtTest {
             Map<String, String> dataSourceProperties = Maps.newHashMap();
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 
2000, 3000");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
             try {
                 stmt.analyze(analyzer);
                 Assert.fail();
             } catch (AnalysisException e) {
-                Assert.assertTrue(e.getMessage().contains("Missing kafka 
partition info"));
+                Assert.assertTrue(e.getMessage().contains("Partitions number 
should be equals to offsets number"));
             } catch (UserException e) {
                 Assert.fail();
             }
@@ -217,7 +219,7 @@ public class AlterRoutineLoadStmtTest {
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, 
"1,2,3");
             
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 
20000, 30000");
             RoutineLoadDataSourceProperties routineLoadDataSourceProperties = 
new RoutineLoadDataSourceProperties(
-                    typeName, dataSourceProperties);
+                    typeName, dataSourceProperties, true);
             AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new 
LabelName("db1", "label1"),
                     jobProperties, routineLoadDataSourceProperties);
             try {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
new file mode 100644
index 0000000..bcadc90
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.routineload.KafkaProgress;
+
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class RoutineLoadDataSourcePropertiesTest {
+
+    @Test
+    public void testCreateNormal() throws UserException {
+        // normal
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 
101, 102");
+        RoutineLoadDataSourceProperties dsProperties = new 
RoutineLoadDataSourceProperties("KAFKA", properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("127.0.0.1:8080", 
dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("test", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(100), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(101), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(102), 
partitinOffsets.get(2).second);
+            Assert.assertFalse(dsProperties.isOffsetsForTimes());
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, with datetime
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, 
"2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("127.0.0.1:8080", 
dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("test", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(1633834800000L), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(1633834800000L), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(1633838400000L), 
partitinOffsets.get(2).second);
+            Assert.assertTrue(dsProperties.isOffsetsForTimes());
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, with default offset as datetime
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("127.0.0.1:8080", 
dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("test", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(2).second);
+            Assert.assertEquals(2, 
dsProperties.getCustomKafkaProperties().size());
+            Assert.assertEquals("1578585600000", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+            Assert.assertEquals("2020-01-10 00:00:00", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+            Assert.assertTrue(dsProperties.isOffsetsForTimes());
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, only set default offset as datetime
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("127.0.0.1:8080", 
dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("test", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(0, partitinOffsets.size());
+            Assert.assertEquals(2, 
dsProperties.getCustomKafkaProperties().size());
+            Assert.assertEquals("1578585600000", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+            Assert.assertEquals("2020-01-10 00:00:00", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, only set default offset as integer
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END);
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("127.0.0.1:8080", 
dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("test", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(0, partitinOffsets.size());
+            Assert.assertEquals(1, 
dsProperties.getCustomKafkaProperties().size());
+            Assert.assertEquals(KafkaProgress.OFFSET_END, 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCreateAbnormal() {
+        // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
together
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 
1");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
+        RoutineLoadDataSourceProperties dsProperties = new 
RoutineLoadDataSourceProperties("KAFKA", properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Only one of 
kafka_offsets and kafka_default_offsets can be set."));
+        }
+
+        // can not set datetime formatted offset and integer offset together
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 
2020-10-10 12:11:11, 1");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("The offset of the 
partition cannot be specified by the timestamp " +
+                    "and the offset at the same time"));
+        }
+
+        // no partitions but has offset
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 
1");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, false);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Partitions number 
should be equals to offsets number"));
+        }
+    }
+
+    @Test
+    public void testAlterNormal() throws UserException {
+        // normal
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 
101, 102");
+        RoutineLoadDataSourceProperties dsProperties = new 
RoutineLoadDataSourceProperties("KAFKA", properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.assertEquals("", dsProperties.getKafkaBrokerList());
+            Assert.assertEquals("", dsProperties.getKafkaTopic());
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(100), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(101), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(102), 
partitinOffsets.get(2).second);
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, with datetime
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, 
"2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
togather
+            dsProperties.analyze();
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(1633834800000L), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(1633834800000L), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(1633838400000L), 
partitinOffsets.get(2).second);
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, with default offset as datetime
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
togather
+            dsProperties.analyze();
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(3, partitinOffsets.size());
+            Assert.assertEquals(Integer.valueOf(0), 
partitinOffsets.get(0).first);
+            Assert.assertEquals(Integer.valueOf(1), 
partitinOffsets.get(1).first);
+            Assert.assertEquals(Integer.valueOf(2), 
partitinOffsets.get(2).first);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(0).second);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(1).second);
+            Assert.assertEquals(Long.valueOf(1578585600000L), 
partitinOffsets.get(2).second);
+            Assert.assertEquals(2, 
dsProperties.getCustomKafkaProperties().size());
+            Assert.assertEquals("1578585600000", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+            Assert.assertEquals("2020-01-10 00:00:00", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        // normal, only set default offset, with utc timezone
+        properties = Maps.newHashMap();
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.UTC_TIME_ZONE);
+        try {
+            // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
togather
+            dsProperties.analyze();
+            List<Pair<Integer, Long>> partitinOffsets = 
dsProperties.getKafkaPartitionOffsets();
+            Assert.assertEquals(0, partitinOffsets.size());
+            Assert.assertEquals(2, 
dsProperties.getCustomKafkaProperties().size());
+            Assert.assertEquals("1578614400000", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+            Assert.assertEquals("2020-01-10 00:00:00", 
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+        } catch (AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testAlterAbnormal() {
+        // can not set KAFKA_BROKER_LIST_PROPERTY
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, 
"127.0.0.1:8080");
+        properties.put("property." + 
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
+        RoutineLoadDataSourceProperties dsProperties = new 
RoutineLoadDataSourceProperties("KAFKA", properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("kafka_broker_list is 
invalid kafka property"));
+        }
+
+        // can not set datetime formatted offset and integer offset together
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 
2");
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 
2020-10-10 12:11:11, 1");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
togather
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("The offset of the 
partition cannot be specified by the timestamp " +
+                    "and the offset at the same time"));
+        }
+
+        // no partitions but has offset
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 
1");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS 
togather
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Partitions number 
should be equals to offsets number"));
+        }
+
+        // only set partition
+        properties = Maps.newHashMap();
+        properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1, 1, 
1");
+        dsProperties = new RoutineLoadDataSourceProperties("KAFKA", 
properties, true);
+        dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+        try {
+            dsProperties.analyze();
+            Assert.fail();
+        } catch (UserException e) {
+            Assert.assertTrue(e.getMessage().contains("Must set offset or 
default offset with partition property"));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 701eb80..7ce75a7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,12 +17,13 @@
 
 package org.apache.doris.load.routineload;
 
-import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.ImportSequenceStmt;
 import org.apache.doris.analysis.LabelName;
 import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
+import org.apache.doris.analysis.Separator;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -43,6 +44,10 @@ import org.apache.doris.thrift.TResourceInfo;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -50,10 +55,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -279,9 +280,12 @@ public class KafkaRoutineLoadJobTest {
             PartitionInfo partitionInfo = new PartitionInfo(topicName, 
Integer.valueOf(s), null, null, null);
             kafkaPartitionInfoList.add(partitionInfo);
         }
-        Deencapsulation.setField(createRoutineLoadStmt, 
"kafkaPartitionOffsets", partitionIdToOffset);
-        Deencapsulation.setField(createRoutineLoadStmt, "kafkaBrokerList", 
serverAddress);
-        Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", 
topicName);
+        RoutineLoadDataSourceProperties dsProperties = new 
RoutineLoadDataSourceProperties();
+        dsProperties.setKafkaPartitionOffsets(partitionIdToOffset);
+        Deencapsulation.setField(dsProperties, "kafkaBrokerList", 
serverAddress);
+        Deencapsulation.setField(dsProperties, "kafkaTopic", topicName);
+        Deencapsulation.setField(createRoutineLoadStmt, 
"dataSourceProperties", dsProperties);
+
         long dbId = 1l;
         long tableId = 2L;
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
index 3cba3fb..f09a509 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
@@ -19,7 +19,8 @@ package org.apache.doris.persist;
 
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
 
 import com.google.common.collect.Maps;
 
@@ -38,7 +39,7 @@ public class AlterRoutineLoadOperationLogTest {
     private static String fileName = "./AlterRoutineLoadOperationLogTest";
 
     @Test
-    public void testSerializeAlterViewInfo() throws IOException, 
AnalysisException {
+    public void testSerializeAlterRoutineLoadOperationLog() throws 
IOException, UserException {
         // 1. Write objects to file
         File file = new File(fileName);
         file.createNewFile();
@@ -47,14 +48,15 @@ public class AlterRoutineLoadOperationLogTest {
         long jobId = 1000;
         Map<String, String> jobProperties = Maps.newHashMap();
         
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
"5");
-        
+
         String typeName = "kafka";
         Map<String, String> dataSourceProperties = Maps.newHashMap();
         
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 
1");
         dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, 
"10000, 20000");
         dataSourceProperties.put("property.group.id", "mygroup");
         RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new 
RoutineLoadDataSourceProperties(typeName,
-                dataSourceProperties);
+                dataSourceProperties, true);
+        
routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
         routineLoadDataSourceProperties.analyze();
 
         AlterRoutineLoadJobOperationLog log = new 
AlterRoutineLoadJobOperationLog(jobId,
@@ -69,6 +71,8 @@ public class AlterRoutineLoadOperationLogTest {
         AlterRoutineLoadJobOperationLog log2 = 
AlterRoutineLoadJobOperationLog.read(in);
         Assert.assertEquals(1, log2.getJobProperties().size());
         Assert.assertEquals("5", 
log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+        Assert.assertEquals("", 
log2.getDataSourceProperties().getKafkaBrokerList());
+        Assert.assertEquals("", 
log2.getDataSourceProperties().getKafkaTopic());
         Assert.assertEquals(1, 
log2.getDataSourceProperties().getCustomKafkaProperties().size());
         Assert.assertEquals("mygroup", 
log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id"));
         
Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0),
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 1a50e94..437fc2f 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -216,6 +216,11 @@ message PStringPair {
     required string val = 2;
 };
 
+message PIntegerPair {
+    required int32 key = 1;
+    required int64 val = 2;
+};
+
 message PKafkaLoadInfo {
     required string brokers = 1;
     required string topic = 2;
@@ -224,6 +229,7 @@ message PKafkaLoadInfo {
 
 message PKafkaMetaProxyRequest {
     optional PKafkaLoadInfo kafka_info = 1;
+    repeated PIntegerPair offset_times = 3;
 };
 
 message PProxyRequest {
@@ -234,9 +240,14 @@ message PKafkaMetaProxyResult {
     repeated int32 partition_ids = 1;
 };
 
+message PKafkaPartitionOffsets {
+    repeated PIntegerPair offset_times = 1;
+};
+
 message PProxyResult {
     required PStatus status = 1;
     optional PKafkaMetaProxyResult kafka_meta_result = 2;
+       optional PKafkaPartitionOffsets partition_offsets = 3;
 };
 
 // NOTE(zc): If you want to add new method here,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to