This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 07ad038 [Feature][RoutineLoad] Support for consuming kafka from the
point of time (#5832)
07ad038 is described below
commit 07ad03887094fd4b70c98d13ce79f6a7debb92ac
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat May 22 23:37:53 2021 +0800
[Feature][RoutineLoad] Support for consuming kafka from the point of time
(#5832)
Support when creating a kafka routine load, start consumption from a
specified point in time instead of a specific offset.
eg:
```
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.kafka_default_offsets" = "2021-10-10 11:00:00"
);
or
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10
12:00:00"
);
```
This PR also reconstructed the analysis method of properties when creating
or altering
routine load jobs, and unified the analysis process in the
`RoutineLoadDataSourceProperties` class.
---
be/src/runtime/routine_load/data_consumer.cpp | 43 +++
be/src/runtime/routine_load/data_consumer.h | 4 +
.../routine_load/routine_load_task_executor.cpp | 50 ++-
.../routine_load/routine_load_task_executor.h | 5 +
be/src/service/internal_service.cpp | 43 ++-
.../load-data/routine-load-manual.md | 21 ++
.../Data Manipulation/ROUTINE LOAD.md | 25 ++
.../load-data/routine-load-manual.md | 20 ++
.../Data Manipulation/ROUTINE LOAD.md | 23 ++
fe/fe-core/src/main/cup/sql_parser.cup | 3 +-
.../doris/analysis/AlterRoutineLoadStmt.java | 15 +-
.../doris/analysis/CreateRoutineLoadStmt.java | 216 ++-----------
.../analysis/RoutineLoadDataSourceProperties.java | 309 ++++++++++++++++--
.../org/apache/doris/common/util/KafkaUtil.java | 74 +++++
.../org/apache/doris/common/util/TimeUtils.java | 16 +-
.../load/routineload/KafkaRoutineLoadJob.java | 72 ++++-
.../doris/load/routineload/RoutineLoadJob.java | 10 +-
.../doris/load/routineload/RoutineLoadManager.java | 2 +-
.../doris/analysis/AlterRoutineLoadStmtTest.java | 22 +-
.../RoutineLoadDataSourcePropertiesTest.java | 347 +++++++++++++++++++++
.../load/routineload/KafkaRoutineLoadJobTest.java | 20 +-
.../persist/AlterRoutineLoadOperationLogTest.java | 12 +-
gensrc/proto/internal_service.proto | 11 +
23 files changed, 1086 insertions(+), 277 deletions(-)
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 301ad24..ea712c9 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -23,6 +23,7 @@
#include <vector>
#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
#include "gutil/strings/split.h"
#include "runtime/small_file_mgr.h"
#include "service/backend_options.h"
@@ -293,6 +294,48 @@ Status
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
return Status::OK();
}
+// get offsets of each partition for times.
+// The input parameter "times" holds <partition, timestamps>
+// The output parameter "offsets" returns <partition, offsets>
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+// See librdkafka/rdkafkacpp.h##offsetsForTimes()
+Status KafkaDataConsumer::get_offsets_for_times(const
std::vector<PIntegerPair>& times,
+ std::vector<PIntegerPair>* offsets) {
+ // create topic partition
+ std::vector<RdKafka::TopicPartition*> topic_partitions;
+ for (const auto& entry : times) {
+ RdKafka::TopicPartition* tp1 =
+ RdKafka::TopicPartition::create(_topic, entry.key(),
entry.val());
+ topic_partitions.push_back(tp1);
+ }
+ // delete TopicPartition finally
+ Defer delete_tp{[&topic_partitions]() {
+ std::for_each(topic_partitions.begin(), topic_partitions.end(),
+ [](RdKafka::TopicPartition* tp1) { delete tp1; });
+ }};
+
+ // get offsets for times
+ RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions,
5000);
+ if (err != RdKafka::ERR_NO_ERROR) {
+ std::stringstream ss;
+ ss << "failed to get offsets for times: " << RdKafka::err2str(err);
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ for (const auto& topic_partition : topic_partitions) {
+ PIntegerPair pair;
+ pair.set_key(topic_partition->partition());
+ pair.set_val(topic_partition->offset());
+ offsets->push_back(pair);
+ }
+
+ return Status::OK();
+}
+
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
diff --git a/be/src/runtime/routine_load/data_consumer.h
b/be/src/runtime/routine_load/data_consumer.h
index f0c1c6e..fcbd017 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -77,6 +77,7 @@ protected:
time_t _last_visit_time;
};
+class PIntegerPair;
class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
@@ -141,6 +142,9 @@ public:
// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);
+ // get offsets for times
+ Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
+ std::vector<PIntegerPair>* offsets);
private:
std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 8bcbfa9..f8f9c71 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -63,15 +63,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}
-Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const
PKafkaMetaProxyRequest& request,
- std::vector<int32_t>*
partition_ids) {
- DCHECK(request.has_kafka_info());
-
- // This context is meaningless, just for unifing the interface
- StreamLoadContext ctx(_exec_env);
- ctx.load_type = TLoadType::ROUTINE_LOAD;
- ctx.load_src_type = TLoadSourceType::KAFKA;
- ctx.label = "NaN";
+// Create a temp StreamLoadContext and set some kafka connection info in it.
+// So that we can use this ctx to get kafka data consumer instance.
+Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest&
request,
+ StreamLoadContext* ctx) {
+ ctx->load_type = TLoadType::ROUTINE_LOAD;
+ ctx->load_src_type = TLoadSourceType::KAFKA;
+ ctx->label = "NaN";
// convert PKafkaInfo to TKafkaLoadInfo
TKafkaLoadInfo t_info;
@@ -84,8 +82,18 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
}
t_info.__set_properties(std::move(properties));
- ctx.kafka_info.reset(new KafkaLoadInfo(t_info));
- ctx.need_rollback = false;
+ ctx->kafka_info.reset(new KafkaLoadInfo(t_info));
+ ctx->need_rollback = false;
+ return Status::OK();
+}
+
+Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const
PKafkaMetaProxyRequest& request,
+ std::vector<int32_t>*
partition_ids) {
+ CHECK(request.has_kafka_info());
+
+ // This context is meaningless, just for unifing the interface
+ StreamLoadContext ctx(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
@@ -98,6 +106,26 @@ Status
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
return st;
}
+Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(const
PKafkaMetaProxyRequest& request,
+ std::vector<PIntegerPair>* partition_offsets) {
+ CHECK(request.has_kafka_info());
+
+ // This context is meaningless, just for unifing the interface
+ StreamLoadContext ctx(_exec_env);
+ RETURN_IF_ERROR(_prepare_ctx(request, &ctx));
+
+ std::shared_ptr<DataConsumer> consumer;
+ RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
+
+ Status st =
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
+ std::vector<PIntegerPair>(request.offset_times().begin(),
request.offset_times().end()),
+ partition_offsets);
+ if (st.ok()) {
+ _data_consumer_pool.return_consumer(consumer);
+ }
+ return st;
+}
+
Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
std::unique_lock<std::mutex> l(_lock);
if (_task_map.find(task.id) != _task_map.end()) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index ffc8b2c..e5c7939 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -52,6 +52,9 @@ public:
Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request,
std::vector<int32_t>* partition_ids);
+ Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest&
request,
+ std::vector<PIntegerPair>* partition_offsets);
+
private:
// execute the task
void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool,
ExecFinishCallback cb);
@@ -60,6 +63,8 @@ private:
// for test only
Status _execute_plan_for_test(StreamLoadContext* ctx);
+ // create a dummy StreamLoadContext for PKafkaMetaProxyRequest
+ Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
StreamLoadContext* ctx);
private:
ExecEnv* _exec_env;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 62e917f..8023751 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -187,19 +187,42 @@ void
PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
+ // PProxyRequest is defined in gensrc/proto/internal_service.proto
+ // Currently it supports 2 kinds of requests:
+ // 1. get all kafka partition ids for given topic
+ // 2. get all kafka partition offsets for given topic and timestamp.
if (request->has_kafka_meta_request()) {
- std::vector<int32_t> partition_ids;
- Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
- request->kafka_meta_request(), &partition_ids);
- if (st.ok()) {
- PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
- for (int32_t id : partition_ids) {
- kafka_result->add_partition_ids(id);
+ const PKafkaMetaProxyRequest& kafka_request =
request->kafka_meta_request();
+ if (!kafka_request.offset_times().empty()) {
+ // if offset_times() has elements, which means this request is to
get offset by timestamp.
+ std::vector<PIntegerPair> partition_offsets;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+ request->kafka_meta_request(), &partition_offsets);
+ if (st.ok()) {
+ PKafkaPartitionOffsets* part_offsets =
response->mutable_partition_offsets();
+ for (const auto& entry : partition_offsets) {
+ PIntegerPair* res = part_offsets->add_offset_times();
+ res->set_key(entry.key());
+ res->set_val(entry.val());
+ }
}
+ st.to_protobuf(response->mutable_status());
+ return;
+ } else {
+ // get partition ids of topic
+ std::vector<int32_t> partition_ids;
+ Status st =
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+ request->kafka_meta_request(), &partition_ids);
+ if (st.ok()) {
+ PKafkaMetaProxyResult* kafka_result =
response->mutable_kafka_meta_result();
+ for (int32_t id : partition_ids) {
+ kafka_result->add_partition_ids(id);
+ }
+ }
+ st.to_protobuf(response->mutable_status());
+ return;
}
- st.to_protobuf(response->mutable_status());
- return;
- }
+ }
Status::OK().to_protobuf(response->mutable_status());
}
diff --git a/docs/en/administrator-guide/load-data/routine-load-manual.md
b/docs/en/administrator-guide/load-data/routine-load-manual.md
index 592ebe5..ab29901 100644
--- a/docs/en/administrator-guide/load-data/routine-load-manual.md
+++ b/docs/en/administrator-guide/load-data/routine-load-manual.md
@@ -273,11 +273,32 @@ The user can control the stop, pause and restart of the
job by the three command
* If the broker of the user kafka cluster has `auto.create.topics.enable =
false` set, topic will not be created automatically, and the routine will be
paused before any data is read, with the status `PAUSED`.
So, if the user wants to be automatically created by the routine when the
kafka topic does not exist, just set the broker in the kafka cluster** of the
user's side to set auto.create.topics.enable = true` .
+
5. Problems that may occur in the some environment
In some environments, there are isolation measures for network segment
and domain name resolution. So should pay attention to:
1. The broker list specified in the routine load task must be
accessible on the doris environment.
2. If `advertised.listeners` is configured in kafka, The addresses in
`advertised.listeners` need to be accessible on the doris environment.
+6. About specified Partition and Offset
+
+ Doris supports specifying Partition and Offset to start consumption. The
new version also supports the consumption function at a specified time point.
The configuration relationship of the corresponding parameters is explained
here.
+
+ There are three relevant parameters:
+
+ * `kafka_partitions`: Specify the list of partitions to be consumed, such
as: "0, 1, 2, 3".
+ * `kafka_offsets`: Specify the starting offset of each partition, which
must correspond to the number of `kafka_partitions` lists. Such as: "1000,
1000, 2000, 2000"
+ * `property.kafka_default_offset`: Specify the default starting offset of
the partition.
+
+ When creating an routine load job, these three parameters can have the
following combinations:
+
+ | Combinations | `kafka_partitions` | `kafka_offsets` |
`property.kafka_default_offset` | Behavior |
+ |---|---|---|---|---|
+ |1| No | No | No | The system will automatically find all the partitions
corresponding to the topic and start consumption from OFFSET_END |
+ |2| No | No | Yes | The system will automatically find all the partitions
corresponding to the topic and start consumption from the position specified by
the default offset |
+ |3| Yes | No | No | The system will start consumption from the OFFSET_END
of the specified partition |
+ |4| Yes | Yes | No | The system will start consumption from the specified
offset of the specified partition |
+ |5| Yes | No | Yes | The system will start consumption from the specified
partition and the location specified by the default offset |
+
## Related parameters
Some system configuration parameters can affect the use of routine loads.
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
index ea6487f..ee0e775 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
@@ -243,6 +243,9 @@ FROM data_source
2) OFFSET_END: Subscribe from the end.
+ 3) Timestamp, the format must be like: "2021-05-11 10:00:00", the
system will automatically locate the offset of the first message greater than
or equal to the timestamp.
+ Note that the offset of the timestamp format cannot be mixed with
the number type, only one of them can be selected.
+
If not specified, all partitions under topic are subscribed by default
fromSET_END.
Example:
@@ -250,6 +253,9 @@ FROM data_source
```
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
+
+ "kafka_partitions" = "0,1",
+ "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
```
4. property
@@ -305,9 +311,12 @@ FROM data_source
2) OFFSET_END: Subscribe from the end.
+ 3) Timestamp, the format is the same as kafka_offsets
+
Example:
`"property.kafka_default_offsets" = "OFFSET_BEGINNING"`
+ `"property.kafka_default_offsets" = "2021-05-11 10:00:00"`
8. load data format sample
@@ -552,6 +561,22 @@ FROM data_source
"kafka_offsets" = "101,0,0,200"
);
+ 9. Start consumption from the specified point in time
+
+ CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+ PROPERTIES
+ (
+ "desired_concurrent_number"="3",
+ "max_batch_interval" = "30",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ ) FROM KAFKA
+ (
+ "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+ "kafka_topic" = "my_topic",
+ "property.kafka_default_offsets" = "2021-10-10 11:00:00"
+ );
+
## keyword
CREATE, ROUTINE, LOAD
diff --git a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
index 0a28012..976446b 100644
--- a/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/routine-load-manual.md
@@ -277,6 +277,26 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
1. 创建Routine load 任务中指定的 Broker list 必须能够被Doris服务访问
2. Kafka 中如果配置了`advertised.listeners`, `advertised.listeners`
中的地址必须能够被Doris服务访问
+6. 关于指定消费的 Partition 和 Offset
+
+ Doris 支持指定 Partition 和 Offset 开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。
+
+ 有三个相关参数:
+
+ * `kafka_partitions`:指定待消费的 partition 列表,如:"0, 1, 2, 3"。
+ * `kafka_offsets`:指定每个分区的起始offset,必须和 `kafka_partitions` 列表个数对应。如:"1000,
1000, 2000, 2000"
+ * `property.kafka_default_offset`:指定分区默认的起始offset。
+
+ 在创建导入作业时,这三个参数可以有以下组合:
+
+ | 组合 | `kafka_partitions` | `kafka_offsets` |
`property.kafka_default_offset` | 行为 |
+ |---|---|---|---|---|
+ |1| No | No | No | 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费 |
+ |2| No | No | Yes | 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费|
+ |3| Yes | No | No | 系统会从指定分区的 OFFSET_END 开始消费 |
+ |4| Yes | Yes | No | 系统会从指定分区的指定offset 处开始消费 |
+ |5| Yes | No | Yes | 系统会从指定分区,default offset 指定的位置开始消费 |
+
## 相关参数
一些系统配置参数会影响例行导入的使用。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE
LOAD.md
index 65fd54c..4200ad4 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
@@ -220,6 +220,7 @@ under the License.
offset 可以指定从大于等于 0 的具体 offset,或者:
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
+ 3) 时间戳,格式必须如:"2021-05-11
10:00:00",系统会自动定位到大于等于该时间戳的第一个消息的offset。注意,时间戳格式的offset不能和数字类型混用,只能选其一。
如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。
示例:
@@ -227,6 +228,9 @@ under the License.
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
+ "kafka_partitions" = "0,1",
+ "kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11
11:00:00"
+
4. property
指定自定义kafka参数。
@@ -264,8 +268,11 @@ under the License.
值为
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
+ 3) 时间戳,格式同 kafka_offsets
+
示例:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ "property.kafka_default_offsets" = "2021-05-11 10:00:00"
8. 导入数据格式样例
@@ -507,6 +514,22 @@ under the License.
"kafka_offsets" = "101,0,0,200"
);
+ 9. 从指定的时间点开始消费
+
+ CREATE ROUTINE LOAD example_db.test_job ON example_tbl
+ PROPERTIES
+ (
+ "desired_concurrent_number"="3",
+ "max_batch_interval" = "30",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ ) FROM KAFKA
+ (
+ "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
+ "kafka_topic" = "my_topic",
+ "property.kafka_default_offsets" = "2021-10-10 11:00:00"
+ );
+
## keyword
CREATE,ROUTINE,LOAD
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index ea6c6d6..02aa5ed 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -803,7 +803,8 @@ opt_datasource_properties ::=
:}
| KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
- RESULT = new RoutineLoadDataSourceProperties(type, customProperties);
+ // the 3rd parameter "true" means this is for AlterRoutineLoad
operation.
+ RESULT = new RoutineLoadDataSourceProperties(type, customProperties,
true);
:}
;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 47fb380..0bd3f6f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -17,11 +17,14 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.load.routineload.RoutineLoadJob;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -42,7 +45,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
- private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET =
new ImmutableSet.Builder<String>()
+ private static final ImmutableSet<String> CONFIGURABLE_JOB_PROPERTIES_SET
= new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)
@@ -110,7 +113,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
private void checkJobProperties() throws UserException {
Optional<String> optional = jobProperties.keySet().stream().filter(
- entity ->
!CONFIGURABLE_PROPERTIES_SET.contains(entity)).findFirst();
+ entity ->
!CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid
property");
}
@@ -194,7 +197,13 @@ public class AlterRoutineLoadStmt extends DdlStmt {
}
}
- private void checkDataSourceProperties() throws AnalysisException {
+ private void checkDataSourceProperties() throws UserException {
+ if (!FeConstants.runningUnitTest) {
+ RoutineLoadJob job =
Catalog.getCurrentCatalog().getRoutineLoadManager().checkPrivAndGetJob(getDbName(),
getLabel());
+ dataSourceProperties.setTimezone(job.getTimezone());
+ } else {
+ dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ }
dataSourceProperties.analyze();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index a828b1a..ffd782a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -31,21 +31,20 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.load.routineload.KafkaProgress;
-import org.apache.doris.load.routineload.LoadDataSourceType;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
-import java.util.regex.Pattern;
/*
Create routine Load statement, continually load data from a streaming app
@@ -87,6 +86,8 @@ import java.util.regex.Pattern;
KAFKA
*/
public class CreateRoutineLoadStmt extends DdlStmt {
+ private static final Logger LOG =
LogManager.getLogger(CreateRoutineLoadStmt.class);
+
// routine load properties
public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY =
"desired_concurrent_number";
// max error number in ten thousand records
@@ -111,9 +112,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";
public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";
-
+ public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS =
"kafka_origin_default_offsets";
+
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
- private static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
+ public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
.add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
@@ -132,19 +134,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(EXEC_MEM_LIMIT_PROPERTY)
.build();
- private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new
ImmutableSet.Builder<String>()
- .add(KAFKA_BROKER_LIST_PROPERTY)
- .add(KAFKA_TOPIC_PROPERTY)
- .add(KAFKA_PARTITIONS_PROPERTY)
- .add(KAFKA_OFFSETS_PROPERTY)
- .build();
-
private final LabelName labelName;
private final String tableName;
private final List<ParseNode> loadPropertyList;
private final Map<String, String> jobProperties;
private final String typeName;
- private final Map<String, String> dataSourceProperties;
+ private final RoutineLoadDataSourceProperties dataSourceProperties;
// the following variables will be initialized after analyze
// -1 as unset, the default value will set in RoutineLoadJob
@@ -165,29 +160,21 @@ public class CreateRoutineLoadStmt extends DdlStmt {
* 1) dataFormat = "json"
* 2) jsonPaths = "$.XXX.xxx"
*/
- private String format = ""; //default is csv.
- private String jsonPaths = "";
- private String jsonRoot = ""; // MUST be a jsonpath string
+ private String format = ""; //default is csv.
+ private String jsonPaths = "";
+ private String jsonRoot = ""; // MUST be a jsonpath string
private boolean stripOuterArray = false;
private boolean numAsString = false;
private boolean fuzzyParse = false;
- // kafka related properties
- private String kafkaBrokerList;
- private String kafkaTopic;
- // pair<partition id, offset>
- private List<Pair<Integer, Long>> kafkaPartitionOffsets =
Lists.newArrayList();
-
- // custom kafka property map<key, value>
- private Map<String, String> customKafkaProperties = Maps.newHashMap();
private LoadTask.MergeType mergeType;
- public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v)
-> { return v > 0L; };
- public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> {
return v >= 0L; };
- public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> {
return v >= 5 && v <= 60; };
- public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> { return
v >= 200000; };
- public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> { return
v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; };
- public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> { return
v >= 0L; };
+ public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v)
-> v > 0L;
+ public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L;
+ public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >=
5 && v <= 60;
+ public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >=
200000;
+ public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100
* 1024 * 1024 && v <= 1024 * 1024 * 1024;
+ public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L;
public CreateRoutineLoadStmt(LabelName labelName, String tableName,
List<ParseNode> loadPropertyList,
Map<String, String> jobProperties, String
typeName,
@@ -197,7 +184,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties == null ? Maps.newHashMap() :
jobProperties;
this.typeName = typeName.toUpperCase();
- this.dataSourceProperties = dataSourceProperties;
+ this.dataSourceProperties = new
RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false);
this.mergeType = mergeType;
}
@@ -278,25 +265,29 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
public String getKafkaBrokerList() {
- return kafkaBrokerList;
+ return this.dataSourceProperties.getKafkaBrokerList();
}
public String getKafkaTopic() {
- return kafkaTopic;
+ return this.dataSourceProperties.getKafkaTopic();
}
public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
- return kafkaPartitionOffsets;
+ return this.dataSourceProperties.getKafkaPartitionOffsets();
}
public Map<String, String> getCustomKafkaProperties() {
- return customKafkaProperties;
+ return this.dataSourceProperties.getCustomKafkaProperties();
}
public LoadTask.MergeType getMergeType() {
return mergeType;
}
+ public boolean isOffsetsForTimes() {
+ return this.dataSourceProperties.isOffsetsForTimes();
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@@ -467,155 +458,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
}
- private void checkDataSourceProperties() throws AnalysisException {
- LoadDataSourceType type;
- try {
- type = LoadDataSourceType.valueOf(typeName);
- } catch (IllegalArgumentException e) {
- throw new AnalysisException("routine load job does not support
this type " + typeName);
- }
- switch (type) {
- case KAFKA:
- checkKafkaProperties();
- break;
- default:
- break;
- }
- }
-
- private void checkKafkaProperties() throws AnalysisException {
- Optional<String> optional = dataSourceProperties.keySet().stream()
- .filter(entity -> !KAFKA_PROPERTIES_SET.contains(entity))
- .filter(entity -> !entity.startsWith("property.")).findFirst();
- if (optional.isPresent()) {
- throw new AnalysisException(optional.get() + " is invalid kafka
custom property");
- }
-
- // check broker list
- kafkaBrokerList =
Strings.nullToEmpty(dataSourceProperties.get(KAFKA_BROKER_LIST_PROPERTY)).replaceAll("
", "");
- if (Strings.isNullOrEmpty(kafkaBrokerList)) {
- throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + " is a
required property");
- }
- String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
- for (String broker : kafkaBrokerList) {
- if (!Pattern.matches(ENDPOINT_REGEX, broker)) {
- throw new AnalysisException(KAFKA_BROKER_LIST_PROPERTY + ":" +
broker
- + " not match pattern " +
ENDPOINT_REGEX);
- }
- }
-
- // check topic
- kafkaTopic =
Strings.nullToEmpty(dataSourceProperties.get(KAFKA_TOPIC_PROPERTY)).replaceAll("
", "");
- if (Strings.isNullOrEmpty(kafkaTopic)) {
- throw new AnalysisException(KAFKA_TOPIC_PROPERTY + " is a required
property");
- }
-
- // check partitions
- String kafkaPartitionsString =
dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY);
- if (kafkaPartitionsString != null) {
- analyzeKafkaPartitionProperty(kafkaPartitionsString,
this.kafkaPartitionOffsets);
- }
-
- // check offset
- String kafkaOffsetsString =
dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY);
- if (kafkaOffsetsString != null) {
- analyzeKafkaOffsetProperty(kafkaOffsetsString,
this.kafkaPartitionOffsets);
- }
-
- // check custom kafka property
- analyzeCustomProperties(this.dataSourceProperties,
this.customKafkaProperties);
- }
-
- public static void analyzeKafkaPartitionProperty(String
kafkaPartitionsString,
- List<Pair<Integer, Long>> kafkaPartitionOffsets) throws
AnalysisException {
- kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
- if (kafkaPartitionsString.isEmpty()) {
- throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could
not be a empty string");
- }
- String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
- for (String s : kafkaPartitionsStringList) {
- try {
-
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s,
KAFKA_PARTITIONS_PROPERTY),
- KafkaProgress.OFFSET_END_VAL));
- } catch (AnalysisException e) {
- throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY
- + " must be a number string with comma-separated");
- }
- }
- }
-
- public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString,
- List<Pair<Integer, Long>> kafkaPartitionOffsets) throws
AnalysisException {
- kafkaOffsetsString = kafkaOffsetsString.replaceAll(" ", "");
- if (kafkaOffsetsString.isEmpty()) {
- throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not
be a empty string");
- }
- String[] kafkaOffsetsStringList = kafkaOffsetsString.split(",");
- if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) {
- throw new AnalysisException("Partitions number should be equals to
offsets number");
- }
-
- for (int i = 0; i < kafkaOffsetsStringList.length; i++) {
- // defined in librdkafka/rdkafkacpp.h
- // OFFSET_BEGINNING: -2
- // OFFSET_END: -1
- try {
- kafkaPartitionOffsets.get(i).second =
getLongValueFromString(kafkaOffsetsStringList[i],
- KAFKA_OFFSETS_PROPERTY);
- if (kafkaPartitionOffsets.get(i).second < 0) {
- throw new AnalysisException("Can not specify offset
smaller than 0");
- }
- } catch (AnalysisException e) {
- if
(kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
- kafkaPartitionOffsets.get(i).second =
KafkaProgress.OFFSET_BEGINNING_VAL;
- } else if
(kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
- kafkaPartitionOffsets.get(i).second =
KafkaProgress.OFFSET_END_VAL;
- } else {
- throw e;
- }
- }
- }
- }
-
- public static void analyzeCustomProperties(Map<String, String>
dataSourceProperties,
- Map<String, String> customKafkaProperties) throws
AnalysisException {
- for (Map.Entry<String, String> dataSourceProperty :
dataSourceProperties.entrySet()) {
- if (dataSourceProperty.getKey().startsWith("property.")) {
- String propertyKey = dataSourceProperty.getKey();
- String propertyValue = dataSourceProperty.getValue();
- String propertyValueArr[] = propertyKey.split("\\.");
- if (propertyValueArr.length < 2) {
- throw new AnalysisException("kafka property value could
not be a empty string");
- }
-
customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1),
propertyValue);
- }
- // can be extended in the future which other prefix
- }
- }
-
- private static int getIntegerValueFromString(String valueString, String
propertyName) throws AnalysisException {
- if (valueString.isEmpty()) {
- throw new AnalysisException(propertyName + " could not be a empty
string");
- }
- int value;
- try {
- value = Integer.valueOf(valueString);
- } catch (NumberFormatException e) {
- throw new AnalysisException(propertyName + " must be a integer");
- }
- return value;
- }
-
- private static long getLongValueFromString(String valueString, String
propertyName) throws AnalysisException {
- if (valueString.isEmpty()) {
- throw new AnalysisException(propertyName + " could not be a empty
string");
- }
- long value;
- try {
- value = Long.valueOf(valueString);
- } catch (NumberFormatException e) {
- throw new AnalysisException(propertyName + " must be a integer: "
+ valueString);
- }
- return value;
+ private void checkDataSourceProperties() throws UserException {
+ this.dataSourceProperties.setTimezone(this.timezone);
+ this.dataSourceProperties.analyze();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
index 1743b2b..ca6f800 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
@@ -19,42 +19,78 @@ package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.routineload.KafkaProgress;
import org.apache.doris.load.routineload.LoadDataSourceType;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.math.NumberUtils;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
public class RoutineLoadDataSourceProperties {
- private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET =
new ImmutableSet.Builder<String>()
+
+ private static final ImmutableSet<String> DATA_SOURCE_PROPERTIES_SET = new
ImmutableSet.Builder<String>()
+ .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
+ .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
+ .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
.build();
-
- @SerializedName(value = "type")
- private String type = "KAFKA";
+
+ private static final ImmutableSet<String>
CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+ .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
+ .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
+ .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
+ .build();
+
// origin properties, no need to persist
private Map<String, String> properties = Maps.newHashMap();
+ private boolean isAlter = false;
+
+ @SerializedName(value = "type")
+ private String type = "KAFKA";
@SerializedName(value = "kafkaPartitionOffsets")
private List<Pair<Integer, Long>> kafkaPartitionOffsets =
Lists.newArrayList();
@SerializedName(value = "customKafkaProperties")
private Map<String, String> customKafkaProperties = Maps.newHashMap();
+ @SerializedName(value = "isOffsetsForTimes")
+ private boolean isOffsetsForTimes = false;
+ @SerializedName(value = "kafkaBrokerList")
+ private String kafkaBrokerList;
+ @SerializedName(value = "KafkaTopic")
+ private String kafkaTopic;
+ @SerializedName(value = "timezone")
+ private String timezone;
public RoutineLoadDataSourceProperties() {
- // empty
+ // for unit test, and empty data source properties when altering
routine load
+ this.isAlter = true;
}
- public RoutineLoadDataSourceProperties(String type, Map<String, String>
properties) {
+ public RoutineLoadDataSourceProperties(String type, Map<String, String>
properties, boolean isAlter) {
this.type = type.toUpperCase();
this.properties = properties;
+ this.isAlter = isAlter;
}
- public void analyze() throws AnalysisException {
+ public void analyze() throws UserException {
+ if (properties.isEmpty()) {
+ throw new AnalysisException("No properties");
+ }
+ Preconditions.checkState(!Strings.isNullOrEmpty(timezone), "timezone
must be set before analyzing");
checkDataSourceProperties();
}
@@ -70,11 +106,31 @@ public class RoutineLoadDataSourceProperties {
return kafkaPartitionOffsets;
}
+ public void setKafkaPartitionOffsets(List<Pair<Integer, Long>>
kafkaPartitionOffsets) {
+ this.kafkaPartitionOffsets = kafkaPartitionOffsets;
+ }
+
public Map<String, String> getCustomKafkaProperties() {
return customKafkaProperties;
}
- private void checkDataSourceProperties() throws AnalysisException {
+ public void setTimezone(String timezone) {
+ this.timezone = timezone;
+ }
+
+ public String getKafkaBrokerList() {
+ return kafkaBrokerList;
+ }
+
+ public String getKafkaTopic() {
+ return kafkaTopic;
+ }
+
+ public boolean isOffsetsForTimes() {
+ return isOffsetsForTimes;
+ }
+
+ private void checkDataSourceProperties() throws UserException {
LoadDataSourceType sourceType;
try {
sourceType = LoadDataSourceType.valueOf(type);
@@ -90,37 +146,242 @@ public class RoutineLoadDataSourceProperties {
}
}
- private void checkKafkaProperties() throws AnalysisException {
+ /*
+ * Kafka properties includes follows:
+ * 1. broker list
+ * 2. topic
+ * 3. partition offset info
+ * 4. other properties start with "property."
+ */
+ private void checkKafkaProperties() throws UserException {
+ ImmutableSet<String> propertySet = isAlter ?
CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET : DATA_SOURCE_PROPERTIES_SET;
Optional<String> optional = properties.keySet().stream().filter(
- entity ->
!CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter(
- entity -> !entity.startsWith("property.")).findFirst();
+ entity -> !propertySet.contains(entity)).filter(
+ entity -> !entity.startsWith("property.")).findFirst();
if (optional.isPresent()) {
- throw new AnalysisException(optional.get() + " is invalid kafka
custom property");
+ throw new AnalysisException(optional.get() + " is invalid kafka
property or can not be set");
+ }
+
+ // check broker list
+ kafkaBrokerList =
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)).replaceAll("
", "");
+ if (!isAlter && Strings.isNullOrEmpty(kafkaBrokerList)) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + " is a
required property");
+ }
+ if (!Strings.isNullOrEmpty(kafkaBrokerList)) {
+ String[] kafkaBrokerList = this.kafkaBrokerList.split(",");
+ for (String broker : kafkaBrokerList) {
+ if (!Pattern.matches(CreateRoutineLoadStmt.ENDPOINT_REGEX,
broker)) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + ":" +
broker
+ + " not match pattern " +
CreateRoutineLoadStmt.ENDPOINT_REGEX);
+ }
+ }
+ }
+
+ // check topic
+ kafkaTopic =
Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)).replaceAll("
", "");
+ if (!isAlter && Strings.isNullOrEmpty(kafkaTopic)) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY + " is a required
property");
}
+ // check custom kafka property
+ // This should be done before check partition and offsets, because we
need KAFKA_DEFAULT_OFFSETS,
+ // which is in custom properties.
+ analyzeCustomProperties(this.properties, this.customKafkaProperties);
+
+ // The partition offset properties are all optional,
+ // and there are 5 valid cases for specifying partition offsets:
+ // A. partition, offset and default offset are not set
+ // Doris will set default offset to OFFSET_END
+ // B. partition and offset are set, default offset is not set
+ // fill the "kafkaPartitionOffsets" with partition and offset
+ // C. partition and default offset are set, offset is not set
+ // fill the "kafkaPartitionOffsets" with partition and default
offset
+ // D. partition is set, offset and default offset are not set
+ // this is only valid when doing create routine load operation,
+ // fill the "kafkaPartitionOffsets" with partition and OFFSET_END
+ // E. only default offset is set.
+ // this is only valid when doing alter routine load operation.
+ // Other cases are illegal.
+
// check partitions
- final String kafkaPartitionsString =
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
+ String kafkaPartitionsString =
properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
if (kafkaPartitionsString != null) {
+ analyzeKafkaPartitionProperty(kafkaPartitionsString,
this.kafkaPartitionOffsets);
+ }
- if
(!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
- throw new AnalysisException("Partition and offset must be
specified at the same time");
+ // check offset
+ String kafkaOffsetsString =
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
+ String kafkaDefaultOffsetString =
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+ if (kafkaOffsetsString != null && kafkaDefaultOffsetString != null) {
+ throw new AnalysisException("Only one of " +
CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY +
+ " and " + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + "
can be set.");
+ }
+ if (isAlter && kafkaPartitionsString != null && kafkaOffsetsString ==
null && kafkaDefaultOffsetString == null) {
+ // if this is an alter operation, the partition and
(default)offset must be set together.
+ throw new AnalysisException("Must set offset or default offset
with partition property");
+ }
+
+ if (kafkaOffsetsString != null) {
+ this.isOffsetsForTimes =
analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets,
this.timezone);
+ } else {
+ // offset is not set, check default offset.
+ this.isOffsetsForTimes =
analyzeKafkaDefaultOffsetProperty(this.customKafkaProperties, this.timezone);
+ if (!this.kafkaPartitionOffsets.isEmpty()) {
+ // Case C
+ kafkaDefaultOffsetString =
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+ setDefaultOffsetForPartition(this.kafkaPartitionOffsets,
kafkaDefaultOffsetString, this.isOffsetsForTimes);
}
+ }
+ }
-
CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString,
kafkaPartitionOffsets);
+ private static void setDefaultOffsetForPartition(List<Pair<Integer, Long>>
kafkaPartitionOffsets,
+ String
kafkaDefaultOffsetString, boolean isOffsetsForTimes) {
+ if (isOffsetsForTimes) {
+ for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+ pair.second = Long.valueOf(kafkaDefaultOffsetString);
+ }
} else {
- if
(properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
- throw new AnalysisException("Missing kafka partition info");
+ for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
+ if
(kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+ pair.second = KafkaProgress.OFFSET_BEGINNING_VAL;
+ } else {
+ pair.second = KafkaProgress.OFFSET_END_VAL;
+ }
}
}
+ }
- // check offset
- String kafkaOffsetsString =
properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
- if (kafkaOffsetsString != null) {
-
CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString,
kafkaPartitionOffsets);
+ // If the default offset is not set, set the default offset to OFFSET_END.
+ // If the offset is in datetime format, convert it to a timestamp, and
also save the origin datatime formatted offset
+ // in "customKafkaProperties"
+ // return true if the offset is in datetime format.
+ private static boolean analyzeKafkaDefaultOffsetProperty(Map<String,
String> customKafkaProperties, String timeZoneStr)
+ throws AnalysisException {
+
customKafkaProperties.putIfAbsent(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS,
KafkaProgress.OFFSET_END);
+ String defaultOffsetStr =
customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
+ TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr);
+ long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr,
timeZone);
+ if (defaultOffset != -1) {
+ // this is a datetime format offset
+
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS,
String.valueOf(defaultOffset));
+ // we convert datetime to timestamp, and save the origin datetime
formatted offset for further use.
+
customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS,
defaultOffsetStr);
+ return true;
+ } else {
+ if
(!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) &&
!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can only be
set to OFFSET_BEGINNING, OFFSET_END or date time");
+ }
+ return false;
+ }
+ }
+
+ // init "kafkaPartitionOffsets" with partition property.
+ // The offset will be set to OFFSET_END for now, and will be changed in
later analysis process.
+ private static void analyzeKafkaPartitionProperty(String
kafkaPartitionsString,
+ List<Pair<Integer,
Long>> kafkaPartitionOffsets) throws AnalysisException {
+ kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
+ if (kafkaPartitionsString.isEmpty()) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY + " could not
be a empty string");
+ }
+ String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
+ for (String s : kafkaPartitionsStringList) {
+ try {
+
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s,
CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY),
+ KafkaProgress.OFFSET_END_VAL));
+ } catch (AnalysisException e) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY
+ + " must be a number string with comma-separated");
+ }
+ }
+ }
+
+ // Fill the partition's offset with given kafkaOffsetsString,
+ // Return true if offset is specified by timestamp.
+ private static boolean analyzeKafkaOffsetProperty(String
kafkaOffsetsString, List<Pair<Integer, Long>> kafkaPartitionOffsets,
+ String timeZoneStr)
+ throws UserException {
+ if (Strings.isNullOrEmpty(kafkaOffsetsString)) {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " could not be
a empty string");
+ }
+ List<String> kafkaOffsetsStringList =
Splitter.on(",").trimResults().splitToList(kafkaOffsetsString);
+ if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) {
+ throw new AnalysisException("Partitions number should be equals to
offsets number");
}
- // check custom properties
- CreateRoutineLoadStmt.analyzeCustomProperties(properties,
customKafkaProperties);
+ // We support two ways to specify the offset,
+ // one is to specify the offset directly, the other is to specify a
timestamp.
+ // Doris will get the offset of the corresponding partition through
the timestamp.
+ // The user can only choose one of these methods.
+ boolean foundTime = false;
+ boolean foundOffset = false;
+ for (String kafkaOffsetsStr : kafkaOffsetsStringList) {
+ if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) {
+ foundTime = true;
+ } else {
+ foundOffset = true;
+ }
+ }
+ if (foundTime && foundOffset) {
+ throw new AnalysisException("The offset of the partition cannot be
specified by the timestamp " +
+ "and the offset at the same time");
+ }
+
+ if (foundTime) {
+ // convert all datetime strs to timestamps
+ // and set them as the partition's offset.
+ // These timestamps will be converted to real offset when job is
running.
+ TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr);
+ for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
+ String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
+ long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr,
timeZone);
+ Preconditions.checkState(timestamp != -1);
+ kafkaPartitionOffsets.get(i).second = timestamp;
+ }
+ } else {
+ for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
+ String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
+ if
(kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+ kafkaPartitionOffsets.get(i).second =
KafkaProgress.OFFSET_BEGINNING_VAL;
+ } else if
(kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+ kafkaPartitionOffsets.get(i).second =
KafkaProgress.OFFSET_END_VAL;
+ } else if (NumberUtils.isDigits(kafkaOffsetsStr)) {
+ kafkaPartitionOffsets.get(i).second =
Long.valueOf(NumberUtils.toLong(kafkaOffsetsStr));
+ } else {
+ throw new
AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " must be an
integer or a date time");
+ }
+ }
+ }
+
+ return foundTime;
+ }
+
+ private static void analyzeCustomProperties(Map<String, String>
dataSourceProperties,
+ Map<String, String>
customKafkaProperties) throws AnalysisException {
+ for (Map.Entry<String, String> dataSourceProperty :
dataSourceProperties.entrySet()) {
+ if (dataSourceProperty.getKey().startsWith("property.")) {
+ String propertyKey = dataSourceProperty.getKey();
+ String propertyValue = dataSourceProperty.getValue();
+ String propertyValueArr[] = propertyKey.split("\\.");
+ if (propertyValueArr.length < 2) {
+ throw new AnalysisException("kafka property value could
not be a empty string");
+ }
+
customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1),
propertyValue);
+ }
+ // can be extended in the future which other prefix
+ }
+ }
+
+ private static int getIntegerValueFromString(String valueString, String
propertyName) throws AnalysisException {
+ if (valueString.isEmpty()) {
+ throw new AnalysisException(propertyName + " could not be a empty
string");
+ }
+ int value;
+ try {
+ value = Integer.valueOf(valueString);
+ } catch (NumberFormatException e) {
+ throw new AnalysisException(propertyName + " must be a integer");
+ }
+ return value;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index 4ecb520..6691af6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
@@ -28,6 +29,8 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
+import com.google.common.collect.Lists;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -93,5 +96,76 @@ public class KafkaUtil {
}
}
}
+
+ // Get offsets by times.
+ // The input parameter "timestampOffsets" is <partition, timestamp>
+ // Tne return value is <partition, offset>
+ public static List<Pair<Integer, Long>> getOffsetsForTimes(String
brokerList, String topic,
+ Map<String,
String> convertedCustomProperties,
+
List<Pair<Integer, Long>> timestampOffsets) throws LoadException {
+ BackendService.Client client = null;
+ TNetworkAddress address = null;
+ LOG.debug("begin to get offsets for times of topic: {}, {}", topic,
timestampOffsets);
+ boolean ok = false;
+ try {
+ List<Long> backendIds =
Catalog.getCurrentSystemInfo().getBackendIds(true);
+ if (backendIds.isEmpty()) {
+ throw new LoadException("Failed to get offset for times. No
alive backends");
+ }
+ Collections.shuffle(backendIds);
+ Backend be =
Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+
+ // create request
+ InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
+ InternalService.PKafkaMetaProxyRequest.newBuilder()
+
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
+ .setBrokers(brokerList)
+ .setTopic(topic)
+ .addAllProperties(
+
convertedCustomProperties.entrySet().stream().map(
+ e ->
InternalService.PStringPair.newBuilder()
+ .setKey(e.getKey())
+
.setVal(e.getValue())
+ .build()
+ ).collect(Collectors.toList())
+ )
+ );
+ for (Pair<Integer, Long> pair : timestampOffsets) {
+
metaRequestBuilder.addOffsetTimes(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
+ .setVal(pair.second).build());
+ }
+
+ InternalService.PProxyRequest request =
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
+ metaRequestBuilder).build();
+
+ // get info
+ Future<InternalService.PProxyResult> future =
BackendServiceProxy.getInstance().getInfo(address, request);
+ InternalService.PProxyResult result = future.get(5,
TimeUnit.SECONDS);
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new UserException("failed to get kafka partition info: "
+ result.getStatus().getErrorMsgsList());
+ } else {
+ List<InternalService.PIntegerPair> pairs =
result.getPartitionOffsets().getOffsetTimesList();
+ List<Pair<Integer, Long>> partitionOffsets =
Lists.newArrayList();
+ for (InternalService.PIntegerPair pair : pairs) {
+ partitionOffsets.add(Pair.create(pair.getKey(),
pair.getVal()));
+ }
+ LOG.debug("finish to get offsets for times of topic: {}, {}",
topic, partitionOffsets);
+ return partitionOffsets;
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to get offsets for times.", e);
+ throw new LoadException(
+ "Failed to get offsets for times of kafka topic: " + topic
+ ". error: " + e.getMessage());
+ } finally {
+ if (ok) {
+ ClientPool.backendPool.returnObject(address, client);
+ } else {
+ ClientPool.backendPool.invalidateObject(address, client);
+ }
+ }
+ }
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index 85901be..4feded8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -233,10 +233,6 @@ public class TimeUtils {
}
}
- public static long dateTransform(long time, Type type) {
- return dateTransform(time, type.getPrimitiveType());
- }
-
public static long timeStringToLong(String timeStr) {
Date d;
try {
@@ -247,6 +243,18 @@ public class TimeUtils {
return d.getTime();
}
+ public static long timeStringToLong(String timeStr, TimeZone timeZone) {
+ SimpleDateFormat dateFormatTimeZone = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss");
+ dateFormatTimeZone.setTimeZone(timeZone);
+ Date d;
+ try {
+ d = dateFormatTimeZone.parse(timeStr);
+ } catch (ParseException e) {
+ return -1;
+ }
+ return d.getTime();
+ }
+
// Check if the time zone_value is valid
public static String checkTimeZoneValidAndStandardize(String value) throws
DdlException {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 856ae0d..e667515 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -40,14 +40,12 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -55,6 +53,9 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -62,6 +63,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import java.util.UUID;
/**
@@ -80,6 +82,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// current kafka partitions is the actually partition which will be fetched
private List<Integer> currentKafkaPartitions = Lists.newArrayList();
// optional, user want to set default offset when new partition add or
offset not set.
+ // kafkaDefaultOffSet has two formats, one is the time format, eg:
"2021-10-10 11:00:00",
+ // the other is string value, including OFFSET_END and OFFSET_BEGINNING.
+ // We should check it by calling isOffsetForTimes() method before use it.
private String kafkaDefaultOffSet = "";
// kafka properties ,property prefix will be mapped to kafka custom
parameters, which can be extended in the future
private Map<String, String> customProperties = Maps.newHashMap();
@@ -110,6 +115,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return convertedCustomProperties;
}
+ private boolean isOffsetForTimes() {
+ long offset = TimeUtils.timeStringToLong(this.kafkaDefaultOffSet);
+ return offset != -1;
+ }
+
+ private long convertedDefaultOffsetToTimestamp() {
+ TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone());
+ return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, timeZone);
+ }
+
@Override
public void prepare() throws UserException {
super.prepare();
@@ -142,7 +157,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
convertedCustomProperties.put(entry.getKey(),
entry.getValue());
}
}
- if
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS))
{
+
+ // This is mainly for compatibility. In the previous version, we
directly obtained the value of the
+ // KAFKA_DEFAULT_OFFSETS attribute. In the new version, we support
date time as the value of KAFKA_DEFAULT_OFFSETS,
+ // and this attribute will be converted into a timestamp during the
analyzing phase, thus losing some information.
+ // So we use KAFKA_ORIGIN_DEFAULT_OFFSETS to store the original
datetime formatted KAFKA_DEFAULT_OFFSETS value
+ if
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS))
{
+ kafkaDefaultOffSet =
convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS);
+ } else if
(convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS))
{
kafkaDefaultOffSet =
convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
}
}
@@ -254,7 +276,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- protected void unprotectUpdateProgress() {
+ protected void unprotectUpdateProgress() throws UserException {
updateNewPartitionProgress();
}
@@ -389,14 +411,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
- private void updateNewPartitionProgress() {
+ private void updateNewPartitionProgress() throws LoadException {
// update the progress of new partitions
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition))
{
// if offset is not assigned, start from OFFSET_END
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if (!kafkaDefaultOffSet.isEmpty()) {
- if
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+ if (isOffsetForTimes()) {
+ // get offset by time
+ List<Pair<Integer, Long>> offsets =
Lists.newArrayList();
+ offsets.add(Pair.create(kafkaPartition,
convertedDefaultOffsetToTimestamp()));
+ offsets =
KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic,
convertedCustomProperties, offsets);
+ Preconditions.checkState(offsets.size() == 1);
+ beginOffSet = offsets.get(0).second;
+ } else if
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
beginOffSet = KafkaProgress.OFFSET_END_VAL;
@@ -420,7 +449,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
super.setOptional(stmt);
if (!stmt.getKafkaPartitionOffsets().isEmpty()) {
- setCustomKafkaPartitions(stmt.getKafkaPartitionOffsets());
+ setCustomKafkaPartitions(stmt);
}
if (!stmt.getCustomKafkaProperties().isEmpty()) {
setCustomKafkaProperties(stmt.getCustomKafkaProperties());
@@ -428,7 +457,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
// this is a unprotected method which is called in the initialization
function
- private void setCustomKafkaPartitions(List<Pair<Integer, Long>>
kafkaPartitionOffsets) throws LoadException {
+ private void setCustomKafkaPartitions(CreateRoutineLoadStmt stmt) throws
LoadException {
+ List<Pair<Integer, Long>> kafkaPartitionOffsets =
stmt.getKafkaPartitionOffsets();
+ boolean isForTimes = stmt.isOffsetsForTimes();
+ if (isForTimes) {
+ // the offset is set by date time, we need to get the real offset
by time
+ kafkaPartitionOffsets =
KafkaUtil.getOffsetsForTimes(stmt.getKafkaBrokerList(), stmt.getKafkaTopic(),
+ convertedCustomProperties,
stmt.getKafkaPartitionOffsets());
+ }
+
for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
this.customKafkaPartitions.add(partitionOffset.first);
((KafkaProgress) progress).addPartitionOffset(partitionOffset);
@@ -497,9 +534,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
@Override
- public void modifyProperties(AlterRoutineLoadStmt stmt) throws
DdlException {
+ public void modifyProperties(AlterRoutineLoadStmt stmt) throws
UserException {
Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
RoutineLoadDataSourceProperties dataSourceProperties =
stmt.getDataSourceProperties();
+ if (dataSourceProperties.isOffsetsForTimes()) {
+ // if the partition offset is set by timestamp, convert it to real
offset
+ convertTimestampToOffset(dataSourceProperties);
+ }
writeLock();
try {
@@ -507,6 +548,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
throw new DdlException("Only supports modification of PAUSED
jobs");
}
+
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new
AlterRoutineLoadJobOperationLog(this.id,
@@ -517,6 +559,16 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
+ private void convertTimestampToOffset(RoutineLoadDataSourceProperties
dataSourceProperties) throws UserException {
+ List<Pair<Integer, Long>> partitionOffsets =
dataSourceProperties.getKafkaPartitionOffsets();
+ if (partitionOffsets.isEmpty()) {
+ return;
+ }
+ List<Pair<Integer, Long>> newOffsets =
KafkaUtil.getOffsetsForTimes(brokerList, topic,
+ convertedCustomProperties, partitionOffsets);
+ dataSourceProperties.setKafkaPartitionOffsets(newOffsets);
+ }
+
private void modifyPropertiesInternal(Map<String, String> jobProperties,
RoutineLoadDataSourceProperties
dataSourceProperties)
throws DdlException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 96ddf79..3ca3825 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -67,9 +67,6 @@ import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -79,6 +76,9 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -1269,7 +1269,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
}
}
- protected void unprotectUpdateProgress() {
+ protected void unprotectUpdateProgress() throws UserException {
}
protected boolean unprotectNeedReschedule() throws UserException {
@@ -1544,7 +1544,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
}
}
- abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws
DdlException;
+ abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws
UserException;
abstract public void
replayModifyProperties(AlterRoutineLoadJobOperationLog log);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 682d3a1..d89bde7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -205,7 +205,7 @@ public class RoutineLoadManager implements Writable {
return false;
}
- private RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
+ public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
throws MetaNotFoundException, DdlException, AnalysisException {
RoutineLoadJob routineLoadJob = getJob(dbName, jobName);
if (routineLoadJob == null) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
index 4246b30..f9ddb688 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -49,7 +50,7 @@ public class AlterRoutineLoadStmtTest {
@Before
public void setUp() {
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
-
+ FeConstants.runningUnitTest = true;
new Expectations() {
{
auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate)
any);
@@ -80,7 +81,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
"1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000,
20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
@@ -131,7 +132,7 @@ public class AlterRoutineLoadStmtTest {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY,
"new_topic");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
@@ -139,8 +140,9 @@ public class AlterRoutineLoadStmtTest {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
- Assert.assertTrue(e.getMessage().contains("kafka_topic is
invalid kafka custom property"));
+ Assert.assertTrue(e.getMessage().contains("kafka_topic is
invalid kafka property"));
} catch (UserException e) {
+ e.printStackTrace();
Assert.fail();
}
}
@@ -152,14 +154,14 @@ public class AlterRoutineLoadStmtTest {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
"1,2,3");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
- Assert.assertTrue(e.getMessage().contains("Partition and
offset must be specified at the same time"));
+ Assert.assertTrue(e.getMessage().contains("Must set offset or
default offset with partition property"));
} catch (UserException e) {
Assert.fail();
}
@@ -173,7 +175,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
"1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000,
2000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
@@ -193,14 +195,14 @@ public class AlterRoutineLoadStmtTest {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000,
2000, 3000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
- Assert.assertTrue(e.getMessage().contains("Missing kafka
partition info"));
+ Assert.assertTrue(e.getMessage().contains("Partitions number
should be equals to offsets number"));
} catch (UserException e) {
Assert.fail();
}
@@ -217,7 +219,7 @@ public class AlterRoutineLoadStmtTest {
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
"1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000,
20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties =
new RoutineLoadDataSourceProperties(
- typeName, dataSourceProperties);
+ typeName, dataSourceProperties, true);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new
LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
new file mode 100644
index 0000000..bcadc90
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.routineload.KafkaProgress;
+
+import com.google.common.collect.Maps;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class RoutineLoadDataSourcePropertiesTest {
+
+ @Test
+ public void testCreateNormal() throws UserException {
+ // normal
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100,
101, 102");
+ RoutineLoadDataSourceProperties dsProperties = new
RoutineLoadDataSourceProperties("KAFKA", properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("127.0.0.1:8080",
dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("test", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(100),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(101),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(102),
partitinOffsets.get(2).second);
+ Assert.assertFalse(dsProperties.isOffsetsForTimes());
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, with datetime
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
"2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("127.0.0.1:8080",
dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("test", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(1633834800000L),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(1633834800000L),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(1633838400000L),
partitinOffsets.get(2).second);
+ Assert.assertTrue(dsProperties.isOffsetsForTimes());
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, with default offset as datetime
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("127.0.0.1:8080",
dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("test", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(2).second);
+ Assert.assertEquals(2,
dsProperties.getCustomKafkaProperties().size());
+ Assert.assertEquals("1578585600000",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+ Assert.assertEquals("2020-01-10 00:00:00",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+ Assert.assertTrue(dsProperties.isOffsetsForTimes());
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, only set default offset as datetime
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("127.0.0.1:8080",
dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("test", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(0, partitinOffsets.size());
+ Assert.assertEquals(2,
dsProperties.getCustomKafkaProperties().size());
+ Assert.assertEquals("1578585600000",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+ Assert.assertEquals("2020-01-10 00:00:00",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, only set default offset as integer
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END);
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("127.0.0.1:8080",
dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("test", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(0, partitinOffsets.size());
+ Assert.assertEquals(1,
dsProperties.getCustomKafkaProperties().size());
+ Assert.assertEquals(KafkaProgress.OFFSET_END,
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCreateAbnormal() {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
together
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1,
1");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
+ RoutineLoadDataSourceProperties dsProperties = new
RoutineLoadDataSourceProperties("KAFKA", properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Only one of
kafka_offsets and kafka_default_offsets can be set."));
+ }
+
+ // can not set datetime formatted offset and integer offset together
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1,
2020-10-10 12:11:11, 1");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("The offset of the
partition cannot be specified by the timestamp " +
+ "and the offset at the same time"));
+ }
+
+ // no partitions but has offset
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1,
1");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, false);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Partitions number
should be equals to offsets number"));
+ }
+ }
+
+ @Test
+ public void testAlterNormal() throws UserException {
+ // normal
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100,
101, 102");
+ RoutineLoadDataSourceProperties dsProperties = new
RoutineLoadDataSourceProperties("KAFKA", properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.assertEquals("", dsProperties.getKafkaBrokerList());
+ Assert.assertEquals("", dsProperties.getKafkaTopic());
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(100),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(101),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(102),
partitinOffsets.get(2).second);
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, with datetime
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
"2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
togather
+ dsProperties.analyze();
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(1633834800000L),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(1633834800000L),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(1633838400000L),
partitinOffsets.get(2).second);
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, with default offset as datetime
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
togather
+ dsProperties.analyze();
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(3, partitinOffsets.size());
+ Assert.assertEquals(Integer.valueOf(0),
partitinOffsets.get(0).first);
+ Assert.assertEquals(Integer.valueOf(1),
partitinOffsets.get(1).first);
+ Assert.assertEquals(Integer.valueOf(2),
partitinOffsets.get(2).first);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(0).second);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(1).second);
+ Assert.assertEquals(Long.valueOf(1578585600000L),
partitinOffsets.get(2).second);
+ Assert.assertEquals(2,
dsProperties.getCustomKafkaProperties().size());
+ Assert.assertEquals("1578585600000",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+ Assert.assertEquals("2020-01-10 00:00:00",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // normal, only set default offset, with utc timezone
+ properties = Maps.newHashMap();
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.UTC_TIME_ZONE);
+ try {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
togather
+ dsProperties.analyze();
+ List<Pair<Integer, Long>> partitinOffsets =
dsProperties.getKafkaPartitionOffsets();
+ Assert.assertEquals(0, partitinOffsets.size());
+ Assert.assertEquals(2,
dsProperties.getCustomKafkaProperties().size());
+ Assert.assertEquals("1578614400000",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
+ Assert.assertEquals("2020-01-10 00:00:00",
dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS));
+ } catch (AnalysisException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAlterAbnormal() {
+ // can not set KAFKA_BROKER_LIST_PROPERTY
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY,
"127.0.0.1:8080");
+ properties.put("property." +
CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
+ RoutineLoadDataSourceProperties dsProperties = new
RoutineLoadDataSourceProperties("KAFKA", properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("kafka_broker_list is
invalid kafka property"));
+ }
+
+ // can not set datetime formatted offset and integer offset together
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1,
2");
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1,
2020-10-10 12:11:11, 1");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
togather
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("The offset of the
partition cannot be specified by the timestamp " +
+ "and the offset at the same time"));
+ }
+
+ // no partitions but has offset
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1,
1");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS
togather
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Partitions number
should be equals to offsets number"));
+ }
+
+ // only set partition
+ properties = Maps.newHashMap();
+ properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1, 1,
1");
+ dsProperties = new RoutineLoadDataSourceProperties("KAFKA",
properties, true);
+ dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
+ try {
+ dsProperties.analyze();
+ Assert.fail();
+ } catch (UserException e) {
+ Assert.assertTrue(e.getMessage().contains("Must set offset or
default offset with partition property"));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 701eb80..7ce75a7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,12 +17,13 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.ImportSequenceStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.ParseNode;
import org.apache.doris.analysis.PartitionNames;
+import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
+import org.apache.doris.analysis.Separator;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
@@ -43,6 +44,10 @@ import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -50,10 +55,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -279,9 +280,12 @@ public class KafkaRoutineLoadJobTest {
PartitionInfo partitionInfo = new PartitionInfo(topicName,
Integer.valueOf(s), null, null, null);
kafkaPartitionInfoList.add(partitionInfo);
}
- Deencapsulation.setField(createRoutineLoadStmt,
"kafkaPartitionOffsets", partitionIdToOffset);
- Deencapsulation.setField(createRoutineLoadStmt, "kafkaBrokerList",
serverAddress);
- Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic",
topicName);
+ RoutineLoadDataSourceProperties dsProperties = new
RoutineLoadDataSourceProperties();
+ dsProperties.setKafkaPartitionOffsets(partitionIdToOffset);
+ Deencapsulation.setField(dsProperties, "kafkaBrokerList",
serverAddress);
+ Deencapsulation.setField(dsProperties, "kafkaTopic", topicName);
+ Deencapsulation.setField(createRoutineLoadStmt,
"dataSourceProperties", dsProperties);
+
long dbId = 1l;
long tableId = 2L;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
index 3cba3fb..f09a509 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
@@ -19,7 +19,8 @@ package org.apache.doris.persist;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
-import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
import com.google.common.collect.Maps;
@@ -38,7 +39,7 @@ public class AlterRoutineLoadOperationLogTest {
private static String fileName = "./AlterRoutineLoadOperationLogTest";
@Test
- public void testSerializeAlterViewInfo() throws IOException,
AnalysisException {
+ public void testSerializeAlterRoutineLoadOperationLog() throws
IOException, UserException {
// 1. Write objects to file
File file = new File(fileName);
file.createNewFile();
@@ -47,14 +48,15 @@ public class AlterRoutineLoadOperationLogTest {
long jobId = 1000;
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
"5");
-
+
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0,
1");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
"10000, 20000");
dataSourceProperties.put("property.group.id", "mygroup");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new
RoutineLoadDataSourceProperties(typeName,
- dataSourceProperties);
+ dataSourceProperties, true);
+
routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
routineLoadDataSourceProperties.analyze();
AlterRoutineLoadJobOperationLog log = new
AlterRoutineLoadJobOperationLog(jobId,
@@ -69,6 +71,8 @@ public class AlterRoutineLoadOperationLogTest {
AlterRoutineLoadJobOperationLog log2 =
AlterRoutineLoadJobOperationLog.read(in);
Assert.assertEquals(1, log2.getJobProperties().size());
Assert.assertEquals("5",
log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+ Assert.assertEquals("",
log2.getDataSourceProperties().getKafkaBrokerList());
+ Assert.assertEquals("",
log2.getDataSourceProperties().getKafkaTopic());
Assert.assertEquals(1,
log2.getDataSourceProperties().getCustomKafkaProperties().size());
Assert.assertEquals("mygroup",
log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id"));
Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0),
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 1a50e94..437fc2f 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -216,6 +216,11 @@ message PStringPair {
required string val = 2;
};
+message PIntegerPair {
+ required int32 key = 1;
+ required int64 val = 2;
+};
+
message PKafkaLoadInfo {
required string brokers = 1;
required string topic = 2;
@@ -224,6 +229,7 @@ message PKafkaLoadInfo {
message PKafkaMetaProxyRequest {
optional PKafkaLoadInfo kafka_info = 1;
+ repeated PIntegerPair offset_times = 3;
};
message PProxyRequest {
@@ -234,9 +240,14 @@ message PKafkaMetaProxyResult {
repeated int32 partition_ids = 1;
};
+message PKafkaPartitionOffsets {
+ repeated PIntegerPair offset_times = 1;
+};
+
message PProxyResult {
required PStatus status = 1;
optional PKafkaMetaProxyResult kafka_meta_result = 2;
+ optional PKafkaPartitionOffsets partition_offsets = 3;
};
// NOTE(zc): If you want to add new method here,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]