This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 0aa013a1 [AURON #2058] Introduce KafkaScanExec (#2072)
0aa013a1 is described below
commit 0aa013a12c99b1acabf6ac6180f246798b25b9c9
Author: zhangmang <[email protected]>
AuthorDate: Fri Mar 6 11:42:34 2026 +0800
[AURON #2058] Introduce KafkaScanExec (#2072)
# Which issue does this PR close?
Closes #2058
# Rationale for this change
* add native kafka consumer
# What changes are included in this PR?
* add Protobuf Node : KafkaScanExecNode
* add Native kafka_scan_exec.rs
# Are there any user-facing changes?
* No
# How was this patch tested?
* No need test for kafka_scan_exec.rs
* kafka_scan_exec#test_flink_kafka_partition_assign Validate Kafka
partition allocation strategy.
---
Cargo.lock | 72 +++
Cargo.toml | 1 +
native-engine/auron-planner/proto/auron.proto | 24 +
native-engine/auron-planner/src/planner.rs | 14 +
native-engine/datafusion-ext-plans/Cargo.toml | 2 +
.../src/flink/kafka_scan_exec.rs | 487 +++++++++++++++++++++
.../datafusion-ext-plans/src/flink/mod.rs | 1 +
.../src/flink/serde/flink_deserializer.rs | 2 +-
native-engine/datafusion-ext-plans/src/lib.rs | 2 +
9 files changed, 604 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 1ac2b500..a26e62c1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1271,7 +1271,9 @@ dependencies = [
"prost-reflect",
"prost-types 0.14.3",
"rand",
+ "rdkafka",
"smallvec 2.0.0-alpha.11",
+ "sonic-rs",
"tokio",
"unchecked-index",
]
@@ -2470,6 +2472,18 @@ dependencies = [
"zlib-rs",
]
+[[package]]
+name = "libz-sys"
+version = "1.1.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839"
+dependencies = [
+ "cc",
+ "libc",
+ "pkg-config",
+ "vcpkg",
+]
+
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
@@ -2755,6 +2769,28 @@ dependencies = [
"libm",
]
+[[package]]
+name = "num_enum"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c"
+dependencies = [
+ "num_enum_derive",
+ "rustversion",
+]
+
+[[package]]
+name = "num_enum_derive"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7"
+dependencies = [
+ "proc-macro-crate",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
[[package]]
name = "object"
version = "0.36.7"
@@ -3378,6 +3414,36 @@ dependencies = [
"getrandom 0.3.3",
]
+[[package]]
+name = "rdkafka"
+version = "0.36.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d"
+dependencies = [
+ "futures-channel",
+ "futures-util",
+ "libc",
+ "log",
+ "rdkafka-sys",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "slab",
+ "tokio",
+]
+
+[[package]]
+name = "rdkafka-sys"
+version = "4.10.0+2.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77"
+dependencies = [
+ "libc",
+ "libz-sys",
+ "num_enum",
+ "pkg-config",
+]
+
[[package]]
name = "recursive"
version = "0.1.1"
@@ -4338,6 +4404,12 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "vcpkg"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
+
[[package]]
name = "version_check"
version = "0.9.5"
diff --git a/Cargo.toml b/Cargo.toml
index a59cd1d5..830d39f8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -187,6 +187,7 @@ tonic-build = "0.13.1"
transpose = "0.2.3"
unchecked-index = "0.2.2"
zstd = "0.13.3"
+rdkafka = { version = "0.36.0", features = ["tokio"] }
[patch.crates-io]
# datafusion: branch=v49.0.0-blaze
diff --git a/native-engine/auron-planner/proto/auron.proto
b/native-engine/auron-planner/proto/auron.proto
index 8a00d740..49ecd2d2 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -51,6 +51,7 @@ message PhysicalPlanNode {
GenerateExecNode generate = 23;
ParquetSinkExecNode parquet_sink = 24;
OrcScanExecNode orc_scan = 25;
+ KafkaScanExecNode kafka_scan = 26;
}
}
@@ -744,6 +745,29 @@ message ExpandProjection {
repeated PhysicalExprNode expr = 1;
}
+
+message KafkaScanExecNode {
+ string kafka_topic = 1;
+ string kafka_properties_json = 2;
+ Schema schema = 3;
+ int32 batch_size = 4;
+ KafkaStartupMode startup_mode = 5;
+ string auron_operator_id = 6;
+ KafkaFormat data_format = 7;
+ string format_config_json = 8;
+}
+
+enum KafkaFormat {
+ JSON = 0;
+ PROTOBUF = 1;
+}
+
+enum KafkaStartupMode {
+ GROUP_OFFSET = 0;
+ EARLIEST = 1;
+ LATEST = 2;
+ TIMESTAMP = 3;
+}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Task related
///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/native-engine/auron-planner/src/planner.rs
b/native-engine/auron-planner/src/planner.rs
index 2a34e7cf..a058c67b 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -72,6 +72,7 @@ use datafusion_ext_plans::{
expand_exec::ExpandExec,
ffi_reader_exec::FFIReaderExec,
filter_exec::FilterExec,
+ flink::kafka_scan_exec::KafkaScanExec,
generate::{create_generator, create_udtf_generator},
generate_exec::GenerateExec,
ipc_reader_exec::IpcReaderExec,
@@ -801,6 +802,19 @@ impl PhysicalPlanner {
props,
)))
}
+ PhysicalPlanType::KafkaScan(kafka_scan) => {
+ let schema = Arc::new(convert_required!(kafka_scan.schema)?);
+ Ok(Arc::new(KafkaScanExec::new(
+ kafka_scan.kafka_topic.clone(),
+ kafka_scan.kafka_properties_json.clone(),
+ schema,
+ kafka_scan.batch_size as i32,
+ kafka_scan.startup_mode,
+ kafka_scan.auron_operator_id.clone(),
+ kafka_scan.data_format,
+ kafka_scan.format_config_json.clone(),
+ )))
+ }
}
}
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml
b/native-engine/datafusion-ext-plans/Cargo.toml
index 88ca18d7..6861eff3 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -66,6 +66,8 @@ unchecked-index = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
prost-reflect = { workspace = true }
+rdkafka = { workspace = true }
+sonic-rs = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
procfs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
new file mode 100644
index 00000000..0840b3d7
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc};
+
+use arrow::array::{
+ ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder,
Int64Array, Int64Builder,
+ RecordBatch,
+};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use auron_jni_bridge::{jni_call_static, jni_get_string, jni_new_string};
+use datafusion::{
+ common::{DataFusionError, Statistics},
+ error::Result,
+ execution::TaskContext,
+ logical_expr::UserDefinedLogicalNode,
+ physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+ physical_plan::{
+ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
SendableRecordBatchStream,
+ execution_plan::{Boundedness, EmissionType},
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
+ },
+};
+use futures::StreamExt;
+use once_cell::sync::OnceCell;
+use rdkafka::{
+ ClientConfig, ClientContext, Offset, TopicPartitionList,
+ config::RDKafkaLogLevel,
+ consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer},
+ error::KafkaResult,
+};
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::{
+ common::{column_pruning::ExecuteWithColumnPruning,
execution_context::ExecutionContext},
+ flink::serde::{flink_deserializer::FlinkDeserializer,
pb_deserializer::PbDeserializer},
+ rdkafka::Message,
+};
+
+struct CustomContext;
+
+impl ClientContext for CustomContext {}
+
+impl ConsumerContext for CustomContext {
+ fn pre_rebalance(&self, rebalance: &Rebalance) {
+ log::info!("Kafka Pre re-balance {rebalance:?}");
+ }
+
+ fn post_rebalance(&self, rebalance: &Rebalance) {
+ log::info!("Kafka Post re-balance {rebalance:?}");
+ }
+
+ fn commit_callback(&self, result: KafkaResult<()>, _offsets:
&TopicPartitionList) {
+ log::info!("Kafka Committing offsets: {result:?}");
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct KafkaScanExec {
+ kafka_topic: String,
+ kafka_properties_json: String,
+ schema: SchemaRef,
+ batch_size: i32,
+ startup_mode: i32,
+ auron_operator_id: String,
+ data_format: i32,
+ format_config_json: String,
+ metrics: ExecutionPlanMetricsSet,
+ props: OnceCell<PlanProperties>,
+}
+
+impl KafkaScanExec {
+ pub fn new(
+ kafka_topic: String,
+ kafka_properties_json: String,
+ schema: SchemaRef,
+ batch_size: i32,
+ startup_mode: i32,
+ auron_operator_id: String,
+ data_format: i32,
+ format_config_json: String,
+ ) -> Self {
+ Self {
+ kafka_topic,
+ kafka_properties_json,
+ schema,
+ batch_size,
+ startup_mode,
+ auron_operator_id,
+ data_format,
+ format_config_json,
+ metrics: ExecutionPlanMetricsSet::new(),
+ props: OnceCell::new(),
+ }
+ }
+
+ fn execute_with_ctx(
+ &self,
+ exec_ctx: Arc<ExecutionContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let serialized_pb_stream = read_serialized_records_from_kafka(
+ exec_ctx.clone(),
+ self.kafka_topic.clone(),
+ self.kafka_properties_json.clone(),
+ self.batch_size as usize,
+ self.startup_mode,
+ self.auron_operator_id.clone(),
+ self.data_format,
+ self.format_config_json.clone(),
+ )?;
+
+ let deserialized_pb_stream = parse_records(
+ exec_ctx.output_schema(),
+ exec_ctx.clone(),
+ serialized_pb_stream,
+ self.format_config_json.clone(),
+ )?;
+ Ok(deserialized_pb_stream)
+ }
+}
+
+impl DisplayAs for KafkaScanExec {
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ write!(f, "KafkaScanExec")
+ }
+}
+
+impl ExecutionPlan for KafkaScanExec {
+ fn name(&self) -> &str {
+ "KafkaScanExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ self.props.get_or_init(|| {
+ PlanProperties::new(
+ EquivalenceProperties::new(self.schema()),
+ UnknownPartitioning(1),
+ EmissionType::Both,
+ Boundedness::Unbounded {
+ requires_infinite_memory: false,
+ },
+ )
+ })
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(Self::new(
+ self.kafka_topic.clone(),
+ self.kafka_properties_json.clone(),
+ self.schema.clone(),
+ self.batch_size,
+ self.startup_mode,
+ self.auron_operator_id.clone(),
+ self.data_format,
+ self.format_config_json.clone(),
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ let exec_ctx = ExecutionContext::new(context, partition,
self.schema(), &self.metrics);
+ self.execute_with_ctx(exec_ctx)
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ todo!()
+ }
+}
+
+impl ExecuteWithColumnPruning for KafkaScanExec {
+ fn execute_projected(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ projection: &[usize],
+ ) -> Result<SendableRecordBatchStream> {
+ let projected_schema = Arc::new(self.schema().project(projection)?);
+ let exec_ctx =
+ ExecutionContext::new(context, partition,
projected_schema.clone(), &self.metrics);
+ self.execute_with_ctx(exec_ctx)
+ }
+}
+
+// A type alias with your custom consumer can be created for convenience.
+type LoggingConsumer = StreamConsumer<CustomContext>;
+
+fn read_serialized_records_from_kafka(
+ exec_ctx: Arc<ExecutionContext>,
+ kafka_topic: String,
+ kafka_properties_json: String,
+ batch_size: usize,
+ startup_mode: i32,
+ auron_operator_id: String,
+ data_format: i32,
+ format_config_json: String,
+) -> Result<SendableRecordBatchStream> {
+ let context = CustomContext;
+ // get source json string from jni bridge resource
+ let resource_id = jni_new_string!(&auron_operator_id)?;
+ let kafka_task_json_java =
+ jni_call_static!(JniBridge.getResource(resource_id.as_obj()) ->
JObject)?;
+ let kafka_task_json = jni_get_string!(kafka_task_json_java.as_obj().into())
+ .expect("kafka_task_json_java is not valid java string");
+ let task_json = sonic_rs::from_str::<sonic_rs::Value>(&kafka_task_json)
+ .expect("source_json_str is not valid json");
+ let num_readers = task_json
+ .get("num_readers")
+ .as_i64()
+ .expect("num_readers is not valid json") as i32;
+ let subtask_index = task_json
+ .get("subtask_index")
+ .as_i64()
+ .expect("subtask_index is not valid json") as i32;
+ let kafka_properties =
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
+ .expect("kafka_properties_json is not valid json");
+ let mut config = ClientConfig::new();
+ config.set_log_level(RDKafkaLogLevel::Info);
+ kafka_properties
+ .into_object()
+ .expect("kafka_properties is not valid json")
+ .iter_mut()
+ .for_each(|(key, value)| {
+ config.set(
+ key,
+ value
+ .as_str()
+ .expect("kafka property value is not valid json string"),
+ );
+ });
+
+ let consumer: Arc<LoggingConsumer> = Arc::new(
+ config
+ .create_with_context(context)
+ .expect("Kafka Consumer creation failed"),
+ );
+ let metadata = consumer
+ .fetch_metadata(Some(&kafka_topic),
Some(std::time::Duration::from_secs(5)))
+ .expect("Failed to fetch kafka metadata");
+
+ // get topic metadata
+ let topic_metadata = metadata
+ .topics()
+ .iter()
+ .find(|t| t.name() == kafka_topic)
+ .expect("Topic not found");
+
+ // get partition metadata
+ let partitions: Vec<i32> = topic_metadata
+ .partitions()
+ .iter()
+ .filter(|p| {
+ flink_kafka_partition_assign(kafka_topic.clone(), p.id(),
num_readers)
+ .expect("flink_kafka_partition_assign failed")
+ == subtask_index
+ })
+ .map(|p| p.id())
+ .collect();
+
+ if partitions.is_empty() {
+ return Err(DataFusionError::Execution(format!(
+ "No partitions found for topic: {kafka_topic}"
+ )));
+ }
+
+ // GROUP_OFFSET = 0;
+ // EARLIEST = 1;
+ // LATEST = 2;
+ // TIMESTAMP = 3;
+ let offset = match startup_mode {
+ 0 => Offset::Stored,
+ 1 => Offset::Beginning,
+ 2 => Offset::End,
+ _ => {
+ return Err(DataFusionError::Execution(format!(
+ "Invalid startup mode: {startup_mode}"
+ )));
+ }
+ };
+
+ log::info!("Subtask {subtask_index} consumed partitions {partitions:?}");
+ let mut partition_list =
TopicPartitionList::with_capacity(partitions.len());
+ for partition in partitions.iter() {
+ partition_list.add_partition_offset(&kafka_topic, *partition, offset);
+ }
+ consumer
+ .assign(&partition_list)
+ .expect("Can't assign partitions to consumer");
+
+ let output_schema = Arc::new(Schema::new(vec![
+ Field::new("serialized_kafka_records_partition", DataType::Int32,
false),
+ Field::new("serialized_kafka_records_offset", DataType::Int64, false),
+ Field::new("serialized_kafka_records_timestamp", DataType::Int64,
false),
+ Field::new("serialized_pb_records", DataType::Binary, false),
+ ]));
+
+ Ok(exec_ctx
+ .with_new_output_schema(output_schema.clone())
+ .output_with_sender("KafkaScanExec.KafkaConsumer", move |sender| async
move {
+ let mut serialized_kafka_records_partition_builder =
Int32Builder::with_capacity(0);
+ let mut serialized_kafka_records_offset_builder =
Int64Builder::with_capacity(0);
+ let mut serialized_kafka_records_timestamp_builder =
Int64Builder::with_capacity(0);
+ let mut serialized_pb_records_builder =
BinaryBuilder::with_capacity(batch_size, 0);
+ loop {
+ while serialized_pb_records_builder.len() < batch_size {
+ match consumer.recv().await {
+ Err(e) => log::warn!("Kafka error: {e}"),
+ Ok(msg) => {
+ if let Some(payload) = msg.payload() {
+ serialized_kafka_records_partition_builder
+ .append_value(msg.partition());
+
serialized_kafka_records_offset_builder.append_value(msg.offset());
+ serialized_kafka_records_timestamp_builder
+
.append_option(msg.timestamp().to_millis());
+
serialized_pb_records_builder.append_value(payload);
+ }
+ }
+ }
+ }
+ let batch = RecordBatch::try_new(
+ output_schema.clone(),
+ vec![
+
Arc::new(serialized_kafka_records_partition_builder.finish()),
+
Arc::new(serialized_kafka_records_offset_builder.finish()),
+
Arc::new(serialized_kafka_records_timestamp_builder.finish()),
+ Arc::new(serialized_pb_records_builder.finish()),
+ ],
+ )?;
+ sender.send(batch).await;
+ }
+ }))
+}
+
+fn parse_records(
+ schema: SchemaRef,
+ exec_ctx: Arc<ExecutionContext>,
+ mut input_stream: SendableRecordBatchStream,
+ parser_config_json: String,
+) -> Result<SendableRecordBatchStream> {
+ let parser_config =
sonic_rs::from_str::<sonic_rs::Value>(&parser_config_json)
+ .expect("parser_config_json is not valid json");
+ let pb_desc_file = parser_config
+ .get("pb_desc_file")
+ .and_then(|v| v.as_str())
+ .expect("pb_desc_file is not valid string")
+ .to_string();
+ let root_message_name = parser_config
+ .get("root_message_name")
+ .and_then(|v| v.as_str())
+ .expect("root_message_name is not valid string")
+ .to_string();
+ let skip_fields = parser_config
+ .get("skip_fields")
+ .and_then(|v| v.as_str())
+ .expect("skip_fields is not valid string")
+ .to_string();
+ let nested_col_mapping_json = parser_config
+ .get("nested_col_mapping")
+ .expect("nested_col_mapping is not valid json");
+ let mut nested_msg_mapping: HashMap<String, String> = HashMap::new();
+ if let Some(obj) = nested_col_mapping_json.as_object() {
+ for (key, value) in obj {
+ if let Some(val_str) = value.as_str() {
+ nested_msg_mapping.insert(key.to_string(),
val_str.to_string());
+ }
+ }
+ }
+
+ let local_pb_desc_file = env::var("PWD").expect("PWD env var is not set")
+ "/" + &pb_desc_file;
+ log::info!("load desc from {local_pb_desc_file}");
+ let file_descriptor_bytes = fs::read(local_pb_desc_file).expect("Failed to
read file");
+ let skip_fields_vec: Vec<String> = skip_fields
+ .split(",")
+ .map(|s| s.to_string())
+ .collect::<Vec<String>>();
+ Ok(exec_ctx.clone().output_with_sender(
+ "KafkaScanExec.ParseRecords",
+ move |sender| async move {
+ // TODO: json parser
+ let mut parser: Box<dyn FlinkDeserializer> =
Box::new(PbDeserializer::new(
+ &file_descriptor_bytes,
+ &root_message_name,
+ schema.clone(),
+ &nested_msg_mapping,
+ &skip_fields_vec,
+ )?);
+ while let Some(batch) = input_stream.next().await.transpose()? {
+ let kafka_partition = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("input must be Int32Array");
+ let kafka_offset = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("input must be Int64Array");
+ let kafka_timestamp = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("input must be Int64Array");
+ let records = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .expect("input must be BinaryArray");
+ let output_batch = parser.parse_messages_with_kafka_meta(
+ &records,
+ &kafka_partition,
+ &kafka_offset,
+ &kafka_timestamp,
+ )?;
+ sender.send(output_batch).await;
+ }
+ #[allow(unreachable_code)]
+ Ok(())
+ },
+ ))
+}
+
+fn java_string_hashcode(s: &str) -> i32 {
+ let mut hash: i32 = 0;
+ for c in s.chars() {
+ let mut buf = [0; 2];
+ let encoded = c.encode_utf16(&mut buf);
+ for code_unit in encoded.iter().cloned() {
+ hash = hash.wrapping_mul(31).wrapping_add(code_unit as i32);
+ }
+ }
+ hash
+}
+
+fn flink_kafka_partition_assign(topic: String, partition_id: i32, num_readers:
i32) -> Result<i32> {
+ if num_readers <= 0 {
+ return Err(DataFusionError::Execution(format!(
+ "num_readers must be positive: {num_readers}"
+ )));
+ }
+ // Java hashcode
+ let hash_code = java_string_hashcode(&topic);
+ let start_index = (hash_code.wrapping_mul(31) & i32::MAX) % num_readers;
+ Ok((start_index + partition_id).rem_euclid(num_readers))
+}
+
+#[test]
+fn test_flink_kafka_partition_assign() {
+ let topic = "flink_test_topic".to_string();
+ let partition_id = 0;
+ let num_readers = 1000;
+ // the result same with flink
+ let result = flink_kafka_partition_assign(topic, partition_id,
num_readers);
+ assert_eq!(result.expect("Error assigning partition"), 471);
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs
b/native-engine/datafusion-ext-plans/src/flink/mod.rs
index fb72d405..359b29f5 100644
--- a/native-engine/datafusion-ext-plans/src/flink/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs
@@ -13,4 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+pub mod kafka_scan_exec;
pub mod serde;
diff --git
a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
index 6d245ff2..3b8bcfde 100644
--- a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
@@ -17,7 +17,7 @@ use arrow::array::{BinaryArray, Int32Array, Int64Array,
RecordBatch};
/// FlinkDeserializer is used to deserialize messages from kafka.
/// Supports Protobuf, JSON, etc.
-pub trait FlinkDeserializer {
+pub trait FlinkDeserializer: Send {
/// Parse messages from kafka, including kafka metadata such as partitions,
/// offsets, and timestamps.
fn parse_messages_with_kafka_meta(
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs
b/native-engine/datafusion-ext-plans/src/lib.rs
index d114c27a..c6339e14 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -28,6 +28,8 @@ extern crate datafusion;
extern crate datafusion_ext_commons;
extern crate prost;
extern crate prost_reflect;
+extern crate rdkafka;
+extern crate sonic_rs;
// execution plan implementations
pub mod agg;