This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aa013a1 [AURON #2058] Introduce KafkaScanExec (#2072)
0aa013a1 is described below

commit 0aa013a12c99b1acabf6ac6180f246798b25b9c9
Author: zhangmang <[email protected]>
AuthorDate: Fri Mar 6 11:42:34 2026 +0800

    [AURON #2058] Introduce KafkaScanExec (#2072)
    
    # Which issue does this PR close?
    
    Closes #2058
    
    # Rationale for this change
    * add native kafka consumer
    
    # What changes are included in this PR?
    * add Protobuf Node : KafkaScanExecNode
    * add Native kafka_scan_exec.rs
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * No need test for kafka_scan_exec.rs
    * kafka_scan_exec#test_flink_kafka_partition_assign Validate Kafka
    partition allocation strategy.
---
 Cargo.lock                                         |  72 +++
 Cargo.toml                                         |   1 +
 native-engine/auron-planner/proto/auron.proto      |  24 +
 native-engine/auron-planner/src/planner.rs         |  14 +
 native-engine/datafusion-ext-plans/Cargo.toml      |   2 +
 .../src/flink/kafka_scan_exec.rs                   | 487 +++++++++++++++++++++
 .../datafusion-ext-plans/src/flink/mod.rs          |   1 +
 .../src/flink/serde/flink_deserializer.rs          |   2 +-
 native-engine/datafusion-ext-plans/src/lib.rs      |   2 +
 9 files changed, 604 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1ac2b500..a26e62c1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1271,7 +1271,9 @@ dependencies = [
  "prost-reflect",
  "prost-types 0.14.3",
  "rand",
+ "rdkafka",
  "smallvec 2.0.0-alpha.11",
+ "sonic-rs",
  "tokio",
  "unchecked-index",
 ]
@@ -2470,6 +2472,18 @@ dependencies = [
  "zlib-rs",
 ]
 
+[[package]]
+name = "libz-sys"
+version = "1.1.24"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4735e9cbde5aac84a5ce588f6b23a90b9b0b528f6c5a8db8a4aff300463a0839"
+dependencies = [
+ "cc",
+ "libc",
+ "pkg-config",
+ "vcpkg",
+]
+
 [[package]]
 name = "linux-raw-sys"
 version = "0.4.15"
@@ -2755,6 +2769,28 @@ dependencies = [
  "libm",
 ]
 
+[[package]]
+name = "num_enum"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c"
+dependencies = [
+ "num_enum_derive",
+ "rustversion",
+]
+
+[[package]]
+name = "num_enum_derive"
+version = "0.7.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7"
+dependencies = [
+ "proc-macro-crate",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "object"
 version = "0.36.7"
@@ -3378,6 +3414,36 @@ dependencies = [
  "getrandom 0.3.3",
 ]
 
+[[package]]
+name = "rdkafka"
+version = "0.36.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d"
+dependencies = [
+ "futures-channel",
+ "futures-util",
+ "libc",
+ "log",
+ "rdkafka-sys",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "slab",
+ "tokio",
+]
+
+[[package]]
+name = "rdkafka-sys"
+version = "4.10.0+2.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77"
+dependencies = [
+ "libc",
+ "libz-sys",
+ "num_enum",
+ "pkg-config",
+]
+
 [[package]]
 name = "recursive"
 version = "0.1.1"
@@ -4338,6 +4404,12 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "vcpkg"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
+
 [[package]]
 name = "version_check"
 version = "0.9.5"
diff --git a/Cargo.toml b/Cargo.toml
index a59cd1d5..830d39f8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -187,6 +187,7 @@ tonic-build = "0.13.1"
 transpose = "0.2.3"
 unchecked-index = "0.2.2"
 zstd = "0.13.3"
+rdkafka = { version = "0.36.0", features = ["tokio"] }
 
 [patch.crates-io]
 # datafusion: branch=v49.0.0-blaze
diff --git a/native-engine/auron-planner/proto/auron.proto 
b/native-engine/auron-planner/proto/auron.proto
index 8a00d740..49ecd2d2 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -51,6 +51,7 @@ message PhysicalPlanNode {
     GenerateExecNode generate = 23;
     ParquetSinkExecNode parquet_sink = 24;
     OrcScanExecNode orc_scan = 25;
+    KafkaScanExecNode kafka_scan = 26;
   }
 }
 
@@ -744,6 +745,29 @@ message ExpandProjection {
   repeated PhysicalExprNode expr = 1;
 
 }
+
+message KafkaScanExecNode {
+  string kafka_topic = 1;
+  string kafka_properties_json = 2;
+  Schema schema = 3;
+  int32 batch_size = 4;
+  KafkaStartupMode startup_mode = 5;
+  string auron_operator_id = 6;
+  KafkaFormat data_format = 7;
+  string format_config_json = 8;
+}
+
+enum KafkaFormat {
+  JSON = 0;
+  PROTOBUF = 1;
+}
+
+enum KafkaStartupMode {
+  GROUP_OFFSET = 0;
+  EARLIEST = 1;
+  LATEST = 2;
+  TIMESTAMP = 3;
+}
 
///////////////////////////////////////////////////////////////////////////////////////////////////
 // Task related
 
///////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/native-engine/auron-planner/src/planner.rs 
b/native-engine/auron-planner/src/planner.rs
index 2a34e7cf..a058c67b 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -72,6 +72,7 @@ use datafusion_ext_plans::{
     expand_exec::ExpandExec,
     ffi_reader_exec::FFIReaderExec,
     filter_exec::FilterExec,
+    flink::kafka_scan_exec::KafkaScanExec,
     generate::{create_generator, create_udtf_generator},
     generate_exec::GenerateExec,
     ipc_reader_exec::IpcReaderExec,
@@ -801,6 +802,19 @@ impl PhysicalPlanner {
                     props,
                 )))
             }
+            PhysicalPlanType::KafkaScan(kafka_scan) => {
+                let schema = Arc::new(convert_required!(kafka_scan.schema)?);
+                Ok(Arc::new(KafkaScanExec::new(
+                    kafka_scan.kafka_topic.clone(),
+                    kafka_scan.kafka_properties_json.clone(),
+                    schema,
+                    kafka_scan.batch_size as i32,
+                    kafka_scan.startup_mode,
+                    kafka_scan.auron_operator_id.clone(),
+                    kafka_scan.data_format,
+                    kafka_scan.format_config_json.clone(),
+                )))
+            }
         }
     }
 
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml 
b/native-engine/datafusion-ext-plans/Cargo.toml
index 88ca18d7..6861eff3 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -66,6 +66,8 @@ unchecked-index = { workspace = true }
 prost = { workspace = true }
 prost-types = { workspace = true }
 prost-reflect = { workspace = true }
+rdkafka = { workspace = true }
+sonic-rs = { workspace = true }
 
 [target.'cfg(target_os = "linux")'.dependencies]
 procfs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs 
b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
new file mode 100644
index 00000000..0840b3d7
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, collections::HashMap, env, fmt::Formatter, fs, sync::Arc};
+
+use arrow::array::{
+    ArrayBuilder, BinaryArray, BinaryBuilder, Int32Array, Int32Builder, 
Int64Array, Int64Builder,
+    RecordBatch,
+};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use auron_jni_bridge::{jni_call_static, jni_get_string, jni_new_string};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    logical_expr::UserDefinedLogicalNode,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use futures::StreamExt;
+use once_cell::sync::OnceCell;
+use rdkafka::{
+    ClientConfig, ClientContext, Offset, TopicPartitionList,
+    config::RDKafkaLogLevel,
+    consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer},
+    error::KafkaResult,
+};
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::{
+    common::{column_pruning::ExecuteWithColumnPruning, 
execution_context::ExecutionContext},
+    flink::serde::{flink_deserializer::FlinkDeserializer, 
pb_deserializer::PbDeserializer},
+    rdkafka::Message,
+};
+
+struct CustomContext;
+
+impl ClientContext for CustomContext {}
+
+impl ConsumerContext for CustomContext {
+    fn pre_rebalance(&self, rebalance: &Rebalance) {
+        log::info!("Kafka Pre re-balance {rebalance:?}");
+    }
+
+    fn post_rebalance(&self, rebalance: &Rebalance) {
+        log::info!("Kafka Post re-balance {rebalance:?}");
+    }
+
+    fn commit_callback(&self, result: KafkaResult<()>, _offsets: 
&TopicPartitionList) {
+        log::info!("Kafka Committing offsets: {result:?}");
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct KafkaScanExec {
+    kafka_topic: String,
+    kafka_properties_json: String,
+    schema: SchemaRef,
+    batch_size: i32,
+    startup_mode: i32,
+    auron_operator_id: String,
+    data_format: i32,
+    format_config_json: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaScanExec {
+    pub fn new(
+        kafka_topic: String,
+        kafka_properties_json: String,
+        schema: SchemaRef,
+        batch_size: i32,
+        startup_mode: i32,
+        auron_operator_id: String,
+        data_format: i32,
+        format_config_json: String,
+    ) -> Self {
+        Self {
+            kafka_topic,
+            kafka_properties_json,
+            schema,
+            batch_size,
+            startup_mode,
+            auron_operator_id,
+            data_format,
+            format_config_json,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let serialized_pb_stream = read_serialized_records_from_kafka(
+            exec_ctx.clone(),
+            self.kafka_topic.clone(),
+            self.kafka_properties_json.clone(),
+            self.batch_size as usize,
+            self.startup_mode,
+            self.auron_operator_id.clone(),
+            self.data_format,
+            self.format_config_json.clone(),
+        )?;
+
+        let deserialized_pb_stream = parse_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            serialized_pb_stream,
+            self.format_config_json.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaScanExec {
+    fn name(&self) -> &str {
+        "KafkaScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
+        })
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            self.kafka_topic.clone(),
+            self.kafka_properties_json.clone(),
+            self.schema.clone(),
+            self.batch_size,
+            self.startup_mode,
+            self.auron_operator_id.clone(),
+            self.data_format,
+            self.format_config_json.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        todo!()
+    }
+}
+
+impl ExecuteWithColumnPruning for KafkaScanExec {
+    fn execute_projected(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+        projection: &[usize],
+    ) -> Result<SendableRecordBatchStream> {
+        let projected_schema = Arc::new(self.schema().project(projection)?);
+        let exec_ctx =
+            ExecutionContext::new(context, partition, 
projected_schema.clone(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+}
+
+// A type alias with your custom consumer can be created for convenience.
+type LoggingConsumer = StreamConsumer<CustomContext>;
+
+fn read_serialized_records_from_kafka(
+    exec_ctx: Arc<ExecutionContext>,
+    kafka_topic: String,
+    kafka_properties_json: String,
+    batch_size: usize,
+    startup_mode: i32,
+    auron_operator_id: String,
+    data_format: i32,
+    format_config_json: String,
+) -> Result<SendableRecordBatchStream> {
+    let context = CustomContext;
+    // get source json string from jni bridge resource
+    let resource_id = jni_new_string!(&auron_operator_id)?;
+    let kafka_task_json_java =
+        jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> 
JObject)?;
+    let kafka_task_json = jni_get_string!(kafka_task_json_java.as_obj().into())
+        .expect("kafka_task_json_java is not valid java string");
+    let task_json = sonic_rs::from_str::<sonic_rs::Value>(&kafka_task_json)
+        .expect("source_json_str is not valid json");
+    let num_readers = task_json
+        .get("num_readers")
+        .as_i64()
+        .expect("num_readers is not valid json") as i32;
+    let subtask_index = task_json
+        .get("subtask_index")
+        .as_i64()
+        .expect("subtask_index is not valid json") as i32;
+    let kafka_properties = 
sonic_rs::from_str::<sonic_rs::Value>(&kafka_properties_json)
+        .expect("kafka_properties_json is not valid json");
+    let mut config = ClientConfig::new();
+    config.set_log_level(RDKafkaLogLevel::Info);
+    kafka_properties
+        .into_object()
+        .expect("kafka_properties is not valid json")
+        .iter_mut()
+        .for_each(|(key, value)| {
+            config.set(
+                key,
+                value
+                    .as_str()
+                    .expect("kafka property value is not valid json string"),
+            );
+        });
+
+    let consumer: Arc<LoggingConsumer> = Arc::new(
+        config
+            .create_with_context(context)
+            .expect("Kafka Consumer creation failed"),
+    );
+    let metadata = consumer
+        .fetch_metadata(Some(&kafka_topic), 
Some(std::time::Duration::from_secs(5)))
+        .expect("Failed to fetch kafka metadata");
+
+    // get topic metadata
+    let topic_metadata = metadata
+        .topics()
+        .iter()
+        .find(|t| t.name() == kafka_topic)
+        .expect("Topic not found");
+
+    // get partition metadata
+    let partitions: Vec<i32> = topic_metadata
+        .partitions()
+        .iter()
+        .filter(|p| {
+            flink_kafka_partition_assign(kafka_topic.clone(), p.id(), 
num_readers)
+                .expect("flink_kafka_partition_assign failed")
+                == subtask_index
+        })
+        .map(|p| p.id())
+        .collect();
+
+    if partitions.is_empty() {
+        return Err(DataFusionError::Execution(format!(
+            "No partitions found for topic: {kafka_topic}"
+        )));
+    }
+
+    // GROUP_OFFSET = 0;
+    // EARLIEST = 1;
+    // LATEST = 2;
+    // TIMESTAMP = 3;
+    let offset = match startup_mode {
+        0 => Offset::Stored,
+        1 => Offset::Beginning,
+        2 => Offset::End,
+        _ => {
+            return Err(DataFusionError::Execution(format!(
+                "Invalid startup mode: {startup_mode}"
+            )));
+        }
+    };
+
+    log::info!("Subtask {subtask_index} consumed partitions {partitions:?}");
+    let mut partition_list = 
TopicPartitionList::with_capacity(partitions.len());
+    for partition in partitions.iter() {
+        partition_list.add_partition_offset(&kafka_topic, *partition, offset);
+    }
+    consumer
+        .assign(&partition_list)
+        .expect("Can't assign partitions to consumer");
+
+    let output_schema = Arc::new(Schema::new(vec![
+        Field::new("serialized_kafka_records_partition", DataType::Int32, 
false),
+        Field::new("serialized_kafka_records_offset", DataType::Int64, false),
+        Field::new("serialized_kafka_records_timestamp", DataType::Int64, 
false),
+        Field::new("serialized_pb_records", DataType::Binary, false),
+    ]));
+
+    Ok(exec_ctx
+        .with_new_output_schema(output_schema.clone())
+        .output_with_sender("KafkaScanExec.KafkaConsumer", move |sender| async 
move {
+            let mut serialized_kafka_records_partition_builder = 
Int32Builder::with_capacity(0);
+            let mut serialized_kafka_records_offset_builder = 
Int64Builder::with_capacity(0);
+            let mut serialized_kafka_records_timestamp_builder = 
Int64Builder::with_capacity(0);
+            let mut serialized_pb_records_builder = 
BinaryBuilder::with_capacity(batch_size, 0);
+            loop {
+                while serialized_pb_records_builder.len() < batch_size {
+                    match consumer.recv().await {
+                        Err(e) => log::warn!("Kafka error: {e}"),
+                        Ok(msg) => {
+                            if let Some(payload) = msg.payload() {
+                                serialized_kafka_records_partition_builder
+                                    .append_value(msg.partition());
+                                
serialized_kafka_records_offset_builder.append_value(msg.offset());
+                                serialized_kafka_records_timestamp_builder
+                                    
.append_option(msg.timestamp().to_millis());
+                                
serialized_pb_records_builder.append_value(payload);
+                            }
+                        }
+                    }
+                }
+                let batch = RecordBatch::try_new(
+                    output_schema.clone(),
+                    vec![
+                        
Arc::new(serialized_kafka_records_partition_builder.finish()),
+                        
Arc::new(serialized_kafka_records_offset_builder.finish()),
+                        
Arc::new(serialized_kafka_records_timestamp_builder.finish()),
+                        Arc::new(serialized_pb_records_builder.finish()),
+                    ],
+                )?;
+                sender.send(batch).await;
+            }
+        }))
+}
+
+fn parse_records(
+    schema: SchemaRef,
+    exec_ctx: Arc<ExecutionContext>,
+    mut input_stream: SendableRecordBatchStream,
+    parser_config_json: String,
+) -> Result<SendableRecordBatchStream> {
+    let parser_config = 
sonic_rs::from_str::<sonic_rs::Value>(&parser_config_json)
+        .expect("parser_config_json is not valid json");
+    let pb_desc_file = parser_config
+        .get("pb_desc_file")
+        .and_then(|v| v.as_str())
+        .expect("pb_desc_file is not valid string")
+        .to_string();
+    let root_message_name = parser_config
+        .get("root_message_name")
+        .and_then(|v| v.as_str())
+        .expect("root_message_name is not valid string")
+        .to_string();
+    let skip_fields = parser_config
+        .get("skip_fields")
+        .and_then(|v| v.as_str())
+        .expect("skip_fields is not valid string")
+        .to_string();
+    let nested_col_mapping_json = parser_config
+        .get("nested_col_mapping")
+        .expect("nested_col_mapping is not valid json");
+    let mut nested_msg_mapping: HashMap<String, String> = HashMap::new();
+    if let Some(obj) = nested_col_mapping_json.as_object() {
+        for (key, value) in obj {
+            if let Some(val_str) = value.as_str() {
+                nested_msg_mapping.insert(key.to_string(), 
val_str.to_string());
+            }
+        }
+    }
+
+    let local_pb_desc_file = env::var("PWD").expect("PWD env var is not set") 
+ "/" + &pb_desc_file;
+    log::info!("load desc from {local_pb_desc_file}");
+    let file_descriptor_bytes = fs::read(local_pb_desc_file).expect("Failed to 
read file");
+    let skip_fields_vec: Vec<String> = skip_fields
+        .split(",")
+        .map(|s| s.to_string())
+        .collect::<Vec<String>>();
+    Ok(exec_ctx.clone().output_with_sender(
+        "KafkaScanExec.ParseRecords",
+        move |sender| async move {
+            // TODO: json parser
+            let mut parser: Box<dyn FlinkDeserializer> = 
Box::new(PbDeserializer::new(
+                &file_descriptor_bytes,
+                &root_message_name,
+                schema.clone(),
+                &nested_msg_mapping,
+                &skip_fields_vec,
+            )?);
+            while let Some(batch) = input_stream.next().await.transpose()? {
+                let kafka_partition = batch
+                    .column(0)
+                    .as_any()
+                    .downcast_ref::<Int32Array>()
+                    .expect("input must be Int32Array");
+                let kafka_offset = batch
+                    .column(1)
+                    .as_any()
+                    .downcast_ref::<Int64Array>()
+                    .expect("input must be Int64Array");
+                let kafka_timestamp = batch
+                    .column(2)
+                    .as_any()
+                    .downcast_ref::<Int64Array>()
+                    .expect("input must be Int64Array");
+                let records = batch
+                    .column(3)
+                    .as_any()
+                    .downcast_ref::<BinaryArray>()
+                    .expect("input must be BinaryArray");
+                let output_batch = parser.parse_messages_with_kafka_meta(
+                    &records,
+                    &kafka_partition,
+                    &kafka_offset,
+                    &kafka_timestamp,
+                )?;
+                sender.send(output_batch).await;
+            }
+            #[allow(unreachable_code)]
+            Ok(())
+        },
+    ))
+}
+
+fn java_string_hashcode(s: &str) -> i32 {
+    let mut hash: i32 = 0;
+    for c in s.chars() {
+        let mut buf = [0; 2];
+        let encoded = c.encode_utf16(&mut buf);
+        for code_unit in encoded.iter().cloned() {
+            hash = hash.wrapping_mul(31).wrapping_add(code_unit as i32);
+        }
+    }
+    hash
+}
+
+fn flink_kafka_partition_assign(topic: String, partition_id: i32, num_readers: 
i32) -> Result<i32> {
+    if num_readers <= 0 {
+        return Err(DataFusionError::Execution(format!(
+            "num_readers must be positive: {num_readers}"
+        )));
+    }
+    // Java hashcode
+    let hash_code = java_string_hashcode(&topic);
+    let start_index = (hash_code.wrapping_mul(31) & i32::MAX) % num_readers;
+    Ok((start_index + partition_id).rem_euclid(num_readers))
+}
+
+#[test]
+fn test_flink_kafka_partition_assign() {
+    let topic = "flink_test_topic".to_string();
+    let partition_id = 0;
+    let num_readers = 1000;
+    // the result same with flink
+    let result = flink_kafka_partition_assign(topic, partition_id, 
num_readers);
+    assert_eq!(result.expect("Error assigning partition"), 471);
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs 
b/native-engine/datafusion-ext-plans/src/flink/mod.rs
index fb72d405..359b29f5 100644
--- a/native-engine/datafusion-ext-plans/src/flink/mod.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs
@@ -13,4 +13,5 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+pub mod kafka_scan_exec;
 pub mod serde;
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs 
b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
index 6d245ff2..3b8bcfde 100644
--- a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
@@ -17,7 +17,7 @@ use arrow::array::{BinaryArray, Int32Array, Int64Array, 
RecordBatch};
 
 /// FlinkDeserializer is used to deserialize messages from kafka.
 /// Supports Protobuf, JSON, etc.
-pub trait FlinkDeserializer {
+pub trait FlinkDeserializer: Send {
     /// Parse messages from kafka, including kafka metadata such as partitions,
     /// offsets, and timestamps.
     fn parse_messages_with_kafka_meta(
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs 
b/native-engine/datafusion-ext-plans/src/lib.rs
index d114c27a..c6339e14 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -28,6 +28,8 @@ extern crate datafusion;
 extern crate datafusion_ext_commons;
 extern crate prost;
 extern crate prost_reflect;
+extern crate rdkafka;
+extern crate sonic_rs;
 
 // execution plan implementations
 pub mod agg;

Reply via email to