This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch kafka_tvf
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/kafka_tvf by this push:
new f5888d3f16a [feature](tvf) doris support kafka tvf (#31889)
f5888d3f16a is described below
commit f5888d3f16a138b0856feb2a761875c97ba6fef0
Author: nanfeng <[email protected]>
AuthorDate: Wed Mar 6 22:43:47 2024 +0800
[feature](tvf) doris support kafka tvf (#31889)
```
mysql> SELECT * FROM kafka("kafka_broker_list" =
"127.0.0.1:9092","kafka_topic" = "my-topic",
"format" = "csv", "column_separator" = ",",
"property.kafka_default_offsets" = "OFFSET_END",
csv_schema = "k1:int;k2:int;k3:int");
+------+------+------+
| k1 | k2 | k3 |
+------+------+------+
| 1111 | 222 | 333 |
| 11 | 220 | 33 |
| 22 | 33 | 44 |
+------+------+------+
3 rows in set (10.52 sec)
```
---
.../routine_load/routine_load_task_executor.cpp | 6 +-
.../routine_load/routine_load_task_executor.h | 4 +-
be/src/service/backend_service.cpp | 40 +++
be/src/service/backend_service.h | 4 +
be/src/vec/exec/format/csv/csv_reader.cpp | 6 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 2 +-
.../org/apache/doris/analysis/StorageBackend.java | 3 +-
.../doris/catalog/BuiltinTableValuedFunctions.java | 4 +-
.../apache/doris/datasource/FileQueryScanNode.java | 28 +-
.../doris/datasource/tvf/source/TVFScanNode.java | 3 +-
.../trees/expressions/functions/table/Kafka.java | 56 +++
.../visitor/TableValuedFunctionVisitor.java | 5 +
.../tablefunction/KafkaTableValuedFunction.java | 388 +++++++++++++++++++++
.../org/apache/doris/common/GenericPoolTest.java | 5 +
.../apache/doris/utframe/MockedBackendFactory.java | 5 +
gensrc/thrift/BackendService.thrift | 11 +
gensrc/thrift/Types.thrift | 1 +
17 files changed, 561 insertions(+), 10 deletions(-)
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d22f2bb4a8c..6c14811898c 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -248,6 +248,10 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
return Status::InternalError("unknown load source type");
}
+ return offer_task(ctx);
+}
+
+Status RoutineLoadTaskExecutor::offer_task(std::shared_ptr<StreamLoadContext>
ctx) {
VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief();
// register the task
_task_map[ctx->id] = ctx;
@@ -326,7 +330,7 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
// must put pipe before executing plan fragment
HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed
to add pipe");
- if (!ctx->is_multi_table) {
+ if (!ctx->is_multi_table && ctx->load_type == TLoadType::ROUTINE_LOAD) {
// only for normal load, single-stream-multi-table load will be
planned during consuming
#ifndef BE_TEST
// execute plan fragment, async
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index e4ad8be5921..8215a309d77 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -65,6 +65,8 @@ public:
Status get_kafka_latest_offsets_for_partitions(const
PKafkaMetaProxyRequest& request,
std::vector<PIntegerPair>*
partition_offsets);
+ Status offer_task(std::shared_ptr<StreamLoadContext> ctx);
+
private:
// execute the task
void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool*
pool,
@@ -80,7 +82,7 @@ private:
std::shared_ptr<StreamLoadContext> ctx);
private:
- ExecEnv* _exec_env = nullptr;
+ ExecEnv* _exec_env;
PriorityThreadPool _thread_pool;
DataConsumerPool _data_consumer_pool;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 68adeb1abe2..96622101ab9 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -59,6 +59,7 @@
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
#include "runtime/fragment_mgr.h"
@@ -566,6 +567,45 @@ void BaseBackendService::submit_routine_load_task(TStatus&
t_status,
return Status::OK().to_thrift(&t_status);
}
+void BackendService::send_kafka_tvf_task(TStatus& t_status, const
TKafkaTvfTask& task) {
+ LOG(INFO) << "get kafka tvf task from fe, query_id:" << UniqueId(task.id);
+
+ // parse paramaters
+ // create the context
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ ctx->load_type = TLoadType::MANUL_LOAD;
+ ctx->load_src_type = task.type;
+ ctx->id = UniqueId(task.id);
+
+ if (task.__isset.max_interval_s) {
+ ctx->max_interval_s = task.max_interval_s;
+ }
+ if (task.__isset.max_batch_rows) {
+ ctx->max_batch_rows = task.max_batch_rows;
+ }
+ if (task.__isset.max_batch_size) {
+ ctx->max_batch_size = task.max_batch_size;
+ }
+
+ // set source related params
+ switch (task.type) {
+ case TLoadSourceType::KAFKA:
+ ctx->kafka_info.reset(new KafkaLoadInfo(task.info));
+ break;
+ default:
+ LOG(WARNING) << "unknown load source type: " << task.type;
+ return Status::InternalError("unknown load source
type").to_thrift(&t_status);
+ }
+
+ Status st = _exec_env->routine_load_task_executor()->offer_task(ctx);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to submit kafka tvf task. task id: " <<
task.id;
+ return st.to_thrift(&t_status);
+ }
+
+ return Status::OK().to_thrift(&t_status);
+}
+
/*
* 1. validate user privilege (todo)
* 2. FragmentMgr#exec_plan_fragment
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 3aaee529735..7e35e5bbe68 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -18,6 +18,7 @@
#pragma once
#include <gen_cpp/BackendService.h>
+#include <gen_cpp/BackendService_types.h>
#include <memory>
#include <string>
@@ -46,6 +47,7 @@ class TDiskTrashInfo;
class TCancelPlanFragmentParams;
class TCheckStorageFormatResult;
class TRoutineLoadTask;
+class TKafkaTvfTask;
class TScanBatchResult;
class TScanCloseParams;
class TScanCloseResult;
@@ -102,6 +104,8 @@ public:
void submit_routine_load_task(TStatus& t_status,
const std::vector<TRoutineLoadTask>& tasks)
override;
+ void send_kafka_tvf_task(TStatus& t_status, const TKafkaTvfTask& tasks)
override;
+
// used for external service, open means start the scan procedure
void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params)
override;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 86986f8eea5..20af3600e62 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -291,7 +291,7 @@ Status CsvReader::init_reader(bool is_load) {
_skip_lines = 1;
}
- if (_params.file_type == TFileType::FILE_STREAM) {
+ if (_params.file_type == TFileType::FILE_STREAM || _params.file_type ==
TFileType::FILE_KAFKA) {
RETURN_IF_ERROR(
FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
_state, false));
} else {
@@ -859,7 +859,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- if (_params.file_type == TFileType::FILE_STREAM) {
+ if (_params.file_type == TFileType::FILE_STREAM || _params.file_type ==
TFileType::FILE_KAFKA) {
// Due to http_stream needs to pre read a portion of the data to parse
column information, so it is set to true here
RETURN_IF_ERROR(
FileFactory::create_pipe_reader(_params.load_id,
&_file_reader, _state, true));
@@ -869,7 +869,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool*
is_parse_name) {
&_file_reader));
}
if (_file_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
- _params.file_type != TFileType::FILE_BROKER) {
+ _params.file_type != TFileType::FILE_KAFKA && _params.file_type !=
TFileType::FILE_BROKER) {
return Status::EndOfFile("get parsed schema failed, empty csv file: "
+ _range.path);
}
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 514a925cba4..81a48200641 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -376,7 +376,7 @@ Status NewJsonReader::_open_file_reader(bool need_schema) {
_current_offset = start_offset;
- if (_params.file_type == TFileType::FILE_STREAM) {
+ if (_params.file_type == TFileType::FILE_STREAM || _params.file_type ==
TFileType::FILE_KAFKA) {
// Due to http_stream needs to pre read a portion of the data to parse
column information, so it is set to true here
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader, _state,
need_schema));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 3b4cfefc99e..08cb04b4cf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -146,7 +146,8 @@ public class StorageBackend implements ParseNode {
OFS("Tencent CHDFS"),
GFS("Tencent Goose File System"),
JFS("Juicefs"),
- STREAM("Stream load pipe");
+ STREAM("Stream load pipe"),
+ KAFKA("Kafka load pipe");
private final String description;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index acdeb683f26..8bdd71a2506 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -28,6 +28,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
+import org.apache.doris.nereids.trees.expressions.functions.table.Kafka;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
@@ -61,7 +62,8 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks"),
tableValued(WorkloadGroups.class, "workload_groups"),
- tableValued(ActiveBeTasks.class, "active_be_tasks")
+ tableValued(ActiveBeTasks.class, "active_be_tasks"),
+ tableValued(Kafka.class, "kafka")
);
public static final BuiltinTableValuedFunctions INSTANCE = new
BuiltinTableValuedFunctions();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 7afb04831ce..293955c6752 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -273,7 +273,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
this.inputSplitsNum = inputSplits.size();
- if (inputSplits.isEmpty() && !(getLocationType() ==
TFileType.FILE_STREAM)) {
+ if (inputSplits.isEmpty() && !(getLocationType() ==
TFileType.FILE_STREAM)
+ && !(getLocationType() == TFileType.FILE_KAFKA)) {
return;
}
TFileFormatType fileFormatType = getFileFormatType();
@@ -307,6 +308,31 @@ public abstract class FileQueryScanNode extends
FileScanNode {
curLocations.addToLocations(location);
scanRangeLocations.add(curLocations);
return;
+ } else if (getLocationType() == TFileType.FILE_KAFKA) {
+ params.setFileType(TFileType.FILE_KAFKA);
+ FunctionGenTable table = (FunctionGenTable)
this.desc.getTable();
+ KafkaTableValuedFunction tableValuedFunction =
(KafkaTableValuedFunction) table.getTvf();
+
params.setCompressType(tableValuedFunction.getTFileCompressType());
+
+ Map<Long, TKafkaTvfTask> kafkaTvfTaskMap =
tableValuedFunction.getKafkaTvfTaskMap();
+ for (Entry<Long, TKafkaTvfTask> entry :
kafkaTvfTaskMap.entrySet()) {
+ TScanRangeLocations curLocations = newLocations();
+ TFileRangeDesc rangeDesc = new TFileRangeDesc();
+ rangeDesc.setLoadId(ConnectContext.get().queryId());
+ rangeDesc.setSize(-1);
+ rangeDesc.setFileSize(-1);
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().setParams(params);
+
+ TScanRangeLocation location = new TScanRangeLocation();
+ long backendId = entry.getKey();
+ Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(backendId);
+ location.setBackendId(backendId);
+ location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
+ curLocations.addToLocations(location);
+ scanRangeLocations.add(curLocations);
+ }
+ return;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index dcd248a9782..3546cb5fc95 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -127,7 +127,8 @@ public class TVFScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits() throws UserException {
List<Split> splits = Lists.newArrayList();
- if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) {
+ if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM
+ || tableValuedFunction.getTFileType() == TFileType.FILE_KAFKA)
{
return splits;
}
List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
new file mode 100644
index 00000000000..38afb8c3da5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.KafkaTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** kafka tvf **/
+public class Kafka extends TableValuedFunction {
+ public Kafka(Properties properties) {
+ super("kafka", properties);
+ }
+
+ @Override
+ public FunctionSignature customSignature() {
+ return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
+ }
+
+ @Override
+ protected TableValuedFunctionIf toCatalogFunction() {
+ try {
+ Map<String, String> arguments = getTVFProperties().getMap();
+ return new KafkaTableValuedFunction(arguments);
+ } catch (Throwable t) {
+ throw new AnalysisException("Can not build
KafkaTableValuedFunction by "
+ + this + ": " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitKafka(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 36561e5b12c..cfc67ea4796 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -28,6 +28,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
+import org.apache.doris.nereids.trees.expressions.functions.table.Kafka;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
@@ -100,6 +101,10 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(s3, context);
}
+ default R visitKafka(Kafka kafka, C context) {
+ return visitTableValuedFunction(kafka, context);
+ }
+
default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
return visitTableValuedFunction(workloadGroups, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
new file mode 100644
index 00000000000..541afc4c0c8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CreateRoutineLoadStmt;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.SmallFileMgr;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.kafka.KafkaUtil;
+import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
+import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TKafkaLoadInfo;
+import org.apache.doris.thrift.TKafkaTvfTask;
+import org.apache.doris.thrift.TLoadSourceType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class KafkaTableValuedFunction extends ExternalFileTableValuedFunction {
+ public static final String NAME = "kafka";
+ private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
+
+ private String brokerList;
+
+ private String topic;
+
+ // for kafka properties
+ private KafkaDataSourceProperties kafkaDataSourceProperties;
+ private Map<String, String> customKafkaProperties = Maps.newHashMap();
+ private final List<Integer> kafkaPartitions = Lists.newArrayList();
+ private Boolean hasCustomPartitions = true;
+
+ // for jobProperties
+ private final Map<String, Long> jobProperties = Maps.newHashMap();
+
+ private Long dbId = -1L;
+
+ @Getter
+ private final Map<Long, TKafkaTvfTask> kafkaTvfTaskMap = Maps.newHashMap();
+
+ public static final String PROP_GROUP_ID = "group.id";
+ public static final String KAFKA_FILE_CATALOG = "kafka";
+
+ public KafkaTableValuedFunction(Map<String, String> properties) throws
UserException {
+ // 1. parse and analyze common properties
+ Map<String, String> otherProperties =
super.parseCommonProperties(properties);
+
+ // 2. parse and analyze kafka properties
+ parseAndAnalyzeKafkaProperties(otherProperties);
+
+ // 3. parse and analyze job properties
+ parseAndAnalyzeJobProperties(otherProperties);
+
+ // 4. divide partitions
+ int concurrentTaskNum = calculateConcurrentTaskNum();
+ getKafkaTvfTaskInfoList(concurrentTaskNum);
+
+ // 5. transfer partition list to be
+ transferPartitionListToBe();
+ }
+
+ @Override
+ public TFileType getTFileType() {
+ return TFileType.FILE_KAFKA;
+ }
+
+ @Override
+ public String getFilePath() {
+ return null;
+ }
+
+ @Override
+ public BrokerDesc getBrokerDesc() {
+ return new BrokerDesc("KafkaTvfBroker",
StorageBackend.StorageType.KAFKA, locationProperties);
+ }
+
+ @Override
+ public String getTableName() {
+ return "KafkaTableValuedFunction";
+ }
+
+ protected void setOptional() throws UserException {
+ // set custom kafka partitions
+ if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
+ setKafkaPartitions();
+ }
+ // set kafka customProperties
+ if
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
+
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
+ }
+ // set group id if not specified
+ this.customKafkaProperties.putIfAbsent(PROP_GROUP_ID, "_" +
UUID.randomUUID());
+ }
+
+ private void setKafkaPartitions() throws LoadException {
+ // get kafka partition offsets
+ List<Pair<Integer, Long>> kafkaPartitionOffsets =
kafkaDataSourceProperties.getKafkaPartitionOffsets();
+ boolean isForTimes = kafkaDataSourceProperties.isOffsetsForTimes();
+ if (isForTimes) {
+ // if the offset is set by date time, we need to get the real
offset by time
+ // need to communicate with be
+ // TODO not need ssl file?
+ kafkaPartitionOffsets =
KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(),
+ kafkaDataSourceProperties.getTopic(),
+ customKafkaProperties,
kafkaDataSourceProperties.getKafkaPartitionOffsets());
+ }
+
+ // get partition number list, eg:[0,1,2]
+ for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
+ this.kafkaPartitions.add(partitionOffset.first);
+ }
+ }
+
+ private void setCustomKafkaProperties(Map<String, String> kafkaProperties)
{
+ this.customKafkaProperties = kafkaProperties;
+ }
+
+ private void checkCustomProperties() throws DdlException {
+ SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+ for (Map.Entry<String, String> entry :
customKafkaProperties.entrySet()) {
+ if (entry.getValue().startsWith("FILE:")) {
+ if (dbId == -1L) {
+ throw new DdlException("No db specified for storing ssl
files");
+ }
+
+ String file =
entry.getValue().substring(entry.getValue().indexOf(":") + 1);
+ // check file
+ if (!smallFileMgr.containsFile(dbId, KAFKA_FILE_CATALOG,
file)) {
+ throw new DdlException("File " + file + " does not exist
in db "
+ + dbId + " with catalog: " + KAFKA_FILE_CATALOG);
+ }
+ }
+ }
+ }
+
+ private void checkPartition() throws UserException {
+ // user not define kafka partitions and be return no partitions
+ if (kafkaPartitions.isEmpty()) {
+ throw new AnalysisException("there is no available kafka
partition");
+ }
+ if (!hasCustomPartitions) {
+ return;
+ }
+
+ // get all kafka partitions from be
+ List<Integer> allKafkaPartitions = getAllKafkaPartitions();
+
+ for (Integer customPartition : kafkaPartitions) {
+ if (!allKafkaPartitions.contains(customPartition)) {
+ throw new LoadException("there is a custom kafka partition " +
customPartition
+ + " which is invalid for topic " +
kafkaDataSourceProperties.getTopic());
+ }
+ }
+ }
+
+ private List<Integer> getAllKafkaPartitions() throws UserException {
+ convertCustomProperties();
+ return KafkaUtil.getAllKafkaPartitions(brokerList, topic,
customKafkaProperties);
+ }
+
+ private void convertCustomProperties() throws DdlException {
+ SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+ for (Map.Entry<String, String> entry :
customKafkaProperties.entrySet()) {
+ if (entry.getValue().startsWith("FILE:")) {
+ // convert FILE:file_name -> FILE:file_id:md5
+ String file =
entry.getValue().substring(entry.getValue().indexOf(":") + 1);
+ SmallFileMgr.SmallFile smallFile =
smallFileMgr.getSmallFile(dbId, KAFKA_FILE_CATALOG, file, true);
+ customKafkaProperties.put(entry.getKey(), "FILE:" +
smallFile.id + ":" + smallFile.md5);
+ } else {
+ customKafkaProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public void parseAndAnalyzeKafkaProperties(Map<String, String> properties)
throws UserException {
+ // kafka partition offset may be time-data format
+ // get time zone, to convert time into timestamps during the analysis
phase
+ if (ConnectContext.get() != null) {
+ timezone = ConnectContext.get().getSessionVariable().getTimeZone();
+ }
+ timezone =
TimeUtils.checkTimeZoneValidAndStandardize(properties.getOrDefault(
+ LoadStmt.TIMEZONE, timezone));
+
+ topic =
properties.getOrDefault(KafkaConfiguration.KAFKA_TOPIC.getName(), "");
+ brokerList =
properties.getOrDefault(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), "");
+ Preconditions.checkState(!Strings.isNullOrEmpty(topic), "topic must be
set before analyzing");
+ Preconditions.checkState(!Strings.isNullOrEmpty(brokerList), "broker
list must be set before analyzing");
+
+ String partitionStr =
properties.getOrDefault(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "");
+ if (partitionStr.isEmpty()) {
+ hasCustomPartitions = false;
+ // get all kafka partitions from be
+ List<Integer> allKafkaPartitions = getAllKafkaPartitions();
+ partitionStr = allKafkaPartitions.stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(","));
+ properties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(),
partitionStr);
+ }
+ // parse and analyze kafka properties, broker list and topic are
required, others are optional
+ this.kafkaDataSourceProperties = new
KafkaDataSourceProperties(properties);
+ this.kafkaDataSourceProperties.setTimezone(this.timezone);
+ this.kafkaDataSourceProperties.analyze();
+ // get ssl file db
+ String tableName = properties.getOrDefault(KAFKA_FILE_CATALOG, "");
+ if (!tableName.isEmpty()) {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName);
+ dbId = db.getId();
+ }
+
+ // set custom properties and partition, include some converted
operations
+ setOptional();
+ checkCustomProperties();
+ checkPartition();
+ }
+
+ public void parseAndAnalyzeJobProperties(Map<String, String> properties)
throws AnalysisException {
+ // Copy the properties, because we will remove the key from properties.
+ Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ copiedProps.putAll(properties);
+
+ // get job properties and check range
+ long desiredConcurrentNum = Util.getLongPropertyOrDefault(
+
copiedProps.get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY),
+ Config.max_routine_load_task_concurrent_num,
+ CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PRED,
+ CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY + "
should > 0");
+ long maxBatchIntervalS = Util.getLongPropertyOrDefault(copiedProps.get(
+ CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY),
+ RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND,
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_PRED,
+ CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY + "
should between 1 and 60");
+ long maxBatchRows = Util.getLongPropertyOrDefault(copiedProps.get(
+ CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY),
+ RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS,
CreateRoutineLoadStmt.MAX_BATCH_ROWS_PRED,
+ CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY + " should >
200000");
+ long maxBatchSizeBytes = Util.getLongPropertyOrDefault(copiedProps.get(
+ CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY),
+ RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE,
CreateRoutineLoadStmt.MAX_BATCH_SIZE_PRED,
+ CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY + " should
between 100MB and 1GB");
+
+ // put into jobProperties
+
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
desiredConcurrentNum);
+
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY,
maxBatchIntervalS);
+ jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY,
maxBatchRows);
+ jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY,
maxBatchSizeBytes);
+ }
+
+ public void getKafkaTvfTaskInfoList(int currentConcurrentTaskNum) throws
AnalysisException {
+ // select be by policy
+ BeSelectionPolicy policy = new
BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable().build();
+ List<Long> backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy,
currentConcurrentTaskNum);
+ if (backendIds.isEmpty() || backendIds.size() !=
currentConcurrentTaskNum) {
+ throw new AnalysisException("No available backends or incorrect
number of backends"
+ + ", policy: " + policy);
+ }
+
+ for (int i = 0; i < currentConcurrentTaskNum; i++) {
+ TKafkaTvfTask tKafkaLoadInfo =
createTKafkaTvfTask(ConnectContext.get().queryId(),
+ currentConcurrentTaskNum, i);
+ kafkaTvfTaskMap.put(backendIds.get(i), tKafkaLoadInfo);
+ }
+
+ }
+
+ public int calculateConcurrentTaskNum() throws AnalysisException {
+ int aliveBeNums =
Env.getCurrentSystemInfo().getAllBackendIds(true).size();
+ int desireTaskConcurrentNum = jobProperties.get(
+
CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).intValue();
+ int partitionNum = kafkaPartitions.size();
+
+ int[] numbers = {aliveBeNums, desireTaskConcurrentNum, partitionNum,
+ Config.max_routine_load_task_concurrent_num};
+ int concurrentTaskNum = Arrays.stream(numbers).min().getAsInt();
+ if (concurrentTaskNum == 0) {
+ throw new AnalysisException("concurrent task number is 0");
+ }
+
+ return concurrentTaskNum;
+ }
+
+ private TKafkaTvfTask createTKafkaTvfTask(TUniqueId uniqueId, int
currentConcurrentTaskNum, int i) {
+ Map<Integer, Long> taskKafkaProgress = Maps.newHashMap();
+ List<Pair<Integer, Long>> kafkaPartitionOffsets =
kafkaDataSourceProperties.getKafkaPartitionOffsets();
+ for (int j = i; j < kafkaPartitionOffsets.size(); j = j +
currentConcurrentTaskNum) {
+ Pair<Integer, Long> pair = kafkaPartitionOffsets.get(j);
+ taskKafkaProgress.put(pair.key(), pair.value());
+ }
+
+ TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo();
+ tKafkaLoadInfo.setBrokers(kafkaDataSourceProperties.getBrokerList());
+ tKafkaLoadInfo.setTopic(kafkaDataSourceProperties.getTopic());
+
tKafkaLoadInfo.setProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
+ tKafkaLoadInfo.setPartitionBeginOffset(taskKafkaProgress);
+
+ TKafkaTvfTask tKafkaTvfTask = new TKafkaTvfTask(TLoadSourceType.KAFKA,
uniqueId, tKafkaLoadInfo);
+
tKafkaTvfTask.setMaxIntervalS(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
+
tKafkaTvfTask.setMaxBatchRows(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
+
tKafkaTvfTask.setMaxBatchSize(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY));
+
+ return tKafkaTvfTask;
+ }
+
+ public void transferPartitionListToBe() throws AnalysisException {
+ for (Entry<Long, TKafkaTvfTask> entry : kafkaTvfTaskMap.entrySet()) {
+ Long beId = entry.getKey();
+ TKafkaTvfTask task = entry.getValue();
+
+ Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend == null) {
+ throw new AnalysisException("failed to send tasks to backend "
+ beId + " because not exist");
+ }
+
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
+
+ boolean ok = false;
+ BackendService.Client client = null;
+ try {
+ client = ClientPool.backendPool.borrowObject(address);
+ TStatus tStatus = client.sendKafkaTvfTask(task);
+ ok = true;
+
+ if (tStatus.getStatusCode() != TStatusCode.OK) {
+ throw new AnalysisException("failed to send task. error
code: " + tStatus.getStatusCode()
+ + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ?
tStatus.getErrorMsgs().get(0) : "NaN"));
+ }
+ LOG.debug("send kafka tvf task {} to BE: {}",
DebugUtil.printId(task.id), beId);
+ } catch (Exception e) {
+ throw new AnalysisException("failed to send task: " +
e.getMessage(), e);
+ } finally {
+ if (ok) {
+ ClientPool.backendPool.returnObject(address, client);
+ } else {
+ ClientPool.backendPool.invalidateObject(address, client);
+ }
+ }
+
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index eb9ac858b3e..684758db357 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -35,6 +35,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
+import org.apache.doris.thrift.TKafkaTvfTask;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPreCacheAsyncRequest;
import org.apache.doris.thrift.TPreCacheAsyncResponse;
@@ -206,6 +207,10 @@ public class GenericPoolTest {
return null;
}
+ public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException
{
+ return null;
+ }
+
@Override
public TScanOpenResult openScanner(TScanOpenParams params) throws
TException {
return null;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index c705893c672..431396e008e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -53,6 +53,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
+import org.apache.doris.thrift.TKafkaTvfTask;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPreCacheAsyncRequest;
@@ -391,6 +392,10 @@ public class MockedBackendFactory {
return new TStatus(TStatusCode.OK);
}
+ public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException
{
+ return new TStatus(TStatusCode.OK);
+ }
+
@Override
public TScanOpenResult openScanner(TScanOpenParams params) throws
TException {
return null;
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 376e2a34df9..7ed4516c4b6 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -70,6 +70,15 @@ struct TRoutineLoadTask {
17: optional bool memtable_on_sink_node;
}
+struct TKafkaTvfTask {
+ 1: required Types.TLoadSourceType type
+ 2: required Types.TUniqueId id
+ 3: required TKafkaLoadInfo info
+ 4: optional i64 max_interval_s
+ 5: optional i64 max_batch_rows
+ 6: optional i64 max_batch_size
+}
+
struct TKafkaMetaProxyRequest {
1: optional TKafkaLoadInfo kafka_info
}
@@ -353,6 +362,8 @@ service BackendService {
Status.TStatus submit_routine_load_task(1:list<TRoutineLoadTask> tasks);
+ Status.TStatus send_kafka_tvf_task(1:TKafkaTvfTask task);
+
// doris will build a scan context for this session, context_id returned
if success
DorisExternalService.TScanOpenResult open_scanner(1:
DorisExternalService.TScanOpenParams params);
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 04a1fd35163..8c01dd086dd 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -666,6 +666,7 @@ enum TFileType {
FILE_S3,
FILE_HDFS,
FILE_NET, // read file by network, such as http
+ FILE_KAFKA,
}
struct TTabletCommitInfo {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]