This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 72232d57 [AURON #1848] Introduce native protobuf deserializer (#2069)
72232d57 is described below

commit 72232d57e03b0bee57341f282ce76ed39aac2303
Author: zhangmang <[email protected]>
AuthorDate: Wed Mar 4 17:51:45 2026 +0800

    [AURON #1848] Introduce native protobuf deserializer (#2069)
    
    # Which issue does this PR close?
    Closes #1848
    
    # Rationale for this change
    * Supports deserialize Protobuf data
    
    # What changes are included in this PR?
    * add flink_deserializer.rs
    * pb_deserializer.rs
    * shared_array_builder.rs
    * shared_list_array_builder.rs
    * shared_map_array_builder.rs
    * shared_struct_array_builder.rs
    * add prost-types and prost-reflect
    
    # Are there any user-facing changes?
    * No
    # How was this patch tested?
    * pb_deserializer#test_parse_messages_with_kafka_meta_basic
    * pb_deserializer#test_parse_messages_with_kafka_meta_nested
    * pb_deserializer#test_parse_messages_with_kafka_meta_empty
    *
    pb_deserializer#test_parse_messages_with_kafka_meta_different_partitions
---
 Cargo.lock                                         |   26 +-
 Cargo.toml                                         |    2 +
 native-engine/datafusion-ext-plans/Cargo.toml      |    3 +
 .../datafusion-ext-plans/src/flink/mod.rs          |   16 +
 .../src/flink/serde/flink_deserializer.rs          |   30 +
 .../datafusion-ext-plans/src/flink/serde/mod.rs    |   21 +
 .../src/flink/serde/pb_deserializer.rs             | 2151 ++++++++++++++++++++
 .../src/flink/serde/shared_array_builder.rs        |   85 +
 .../src/flink/serde/shared_list_array_builder.rs   |  147 ++
 .../src/flink/serde/shared_map_array_builder.rs    |  243 +++
 .../src/flink/serde/shared_struct_array_builder.rs |  143 ++
 native-engine/datafusion-ext-plans/src/lib.rs      |    9 +
 12 files changed, 2874 insertions(+), 2 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7e3be0ed..1ac2b500 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1267,6 +1267,9 @@ dependencies = [
  "parking_lot",
  "paste",
  "procfs",
+ "prost 0.14.3",
+ "prost-reflect",
+ "prost-types 0.14.3",
  "rand",
  "smallvec 2.0.0-alpha.11",
  "tokio",
@@ -3167,7 +3170,7 @@ dependencies = [
  "petgraph 0.7.1",
  "prettyplease",
  "prost 0.13.5",
- "prost-types",
+ "prost-types 0.13.5",
  "regex",
  "syn 2.0.104",
  "tempfile",
@@ -3199,6 +3202,16 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "prost-reflect"
+version = "0.16.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b89455ef41ed200cafc47c76c552ee7792370ac420497e551f16123a9135f76e"
+dependencies = [
+ "prost 0.14.3",
+ "prost-types 0.14.3",
+]
+
 [[package]]
 name = "prost-types"
 version = "0.13.5"
@@ -3208,6 +3221,15 @@ dependencies = [
  "prost 0.13.5",
 ]
 
+[[package]]
+name = "prost-types"
+version = "0.14.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7"
+dependencies = [
+ "prost 0.14.3",
+]
+
 [[package]]
 name = "protobuf"
 version = "3.7.2"
@@ -4191,7 +4213,7 @@ dependencies = [
  "prettyplease",
  "proc-macro2",
  "prost-build",
- "prost-types",
+ "prost-types 0.13.5",
  "quote",
  "syn 2.0.104",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index cd73e8b0..a59cd1d5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -176,6 +176,8 @@ parking_lot = "0.12.5"
 paste = "1.0.15"
 procfs = "0.18.0"
 prost = "0.14.3"
+prost-types = "0.14.3"
+prost-reflect = "0.16.3"
 rand = "0.9.2"
 smallvec = "2.0.0-alpha.11"
 sonic-rs = "0.5.7"
diff --git a/native-engine/datafusion-ext-plans/Cargo.toml 
b/native-engine/datafusion-ext-plans/Cargo.toml
index bdc55616..88ca18d7 100644
--- a/native-engine/datafusion-ext-plans/Cargo.toml
+++ b/native-engine/datafusion-ext-plans/Cargo.toml
@@ -63,6 +63,9 @@ paste = { workspace = true }
 smallvec = { workspace = true }
 tokio = { workspace = true }
 unchecked-index = { workspace = true }
+prost = { workspace = true }
+prost-types = { workspace = true }
+prost-reflect = { workspace = true }
 
 [target.'cfg(target_os = "linux")'.dependencies]
 procfs = { workspace = true }
diff --git a/native-engine/datafusion-ext-plans/src/flink/mod.rs 
b/native-engine/datafusion-ext-plans/src/flink/mod.rs
new file mode 100644
index 00000000..fb72d405
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/mod.rs
@@ -0,0 +1,16 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod serde;
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs 
b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
new file mode 100644
index 00000000..6d245ff2
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/flink_deserializer.rs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use arrow::array::{BinaryArray, Int32Array, Int64Array, RecordBatch};
+
+/// FlinkDeserializer is used to deserialize messages from kafka.
+/// Supports Protobuf, JSON, etc.
+pub trait FlinkDeserializer {
+    /// Parse messages from kafka, including kafka metadata such as partitions,
+    /// offsets, and timestamps.
+    fn parse_messages_with_kafka_meta(
+        &mut self,
+        messages: &BinaryArray,
+        kafka_partition: &Int32Array,
+        kafka_offset: &Int64Array,
+        kafka_timestamp: &Int64Array,
+    ) -> datafusion::common::Result<RecordBatch>;
+}
diff --git a/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs 
b/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
new file mode 100644
index 00000000..91d553d1
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod flink_deserializer;
+pub mod pb_deserializer;
+pub mod shared_array_builder;
+pub mod shared_list_array_builder;
+pub mod shared_map_array_builder;
+pub mod shared_struct_array_builder;
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs 
b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
new file mode 100644
index 00000000..3c4ea4f4
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs
@@ -0,0 +1,2151 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{
+    any::Any,
+    cell::UnsafeCell,
+    collections::{HashMap, HashSet},
+    io::Cursor,
+    sync::Arc,
+};
+
+use arrow::array::{
+    Array, ArrayBuilder, ArrayRef, BinaryArray, BinaryBuilder, BooleanBuilder, 
Float32Builder,
+    Float64Builder, Int32Array, Int32Builder, Int64Array, Int64Builder, 
RecordBatch,
+    RecordBatchOptions, StringBuilder, StructArray, 
TimestampMillisecondBuilder, UInt32Builder,
+    UInt64Builder, new_null_array,
+};
+use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, 
TimeUnit};
+use bytes::Buf;
+use datafusion::{
+    common::ExprSchema, error::DataFusionError, 
logical_expr::UserDefinedLogicalNode,
+};
+use datafusion_ext_commons::{df_execution_err, downcast_any};
+use prost::{
+    DecodeError,
+    encoding::{DecodeContext, WireType},
+};
+use prost_reflect::{DescriptorPool, FieldDescriptor, Kind, MessageDescriptor, 
UnknownField};
+
+use crate::flink::serde::{
+    flink_deserializer::FlinkDeserializer, 
shared_array_builder::SharedArrayBuilder,
+    shared_list_array_builder::SharedListArrayBuilder,
+    shared_map_array_builder::SharedMapArrayBuilder,
+    shared_struct_array_builder::SharedStructArrayBuilder,
+};
+
+type ValueHandler =
+    Box<dyn Fn(&mut Cursor<&[u8]>, u32, WireType) -> 
datafusion::error::Result<()> + Send>;
+type ValueHandlerMap = hashbrown::HashMap<u32, ValueHandler, 
foldhash::fast::RandomState>;
+
+pub struct PbDeserializer {
+    output_schema: SchemaRef,
+    output_schema_without_meta: SchemaRef,
+    pb_schema: SchemaRef,
+    output_array_builders: Vec<SharedArrayBuilder>,
+    ensure_size: Box<dyn FnMut(usize) + Send>,
+    value_handlers: ValueHandlerMap,
+    msg_mapping: Vec<Vec<usize>>,
+}
+
+impl FlinkDeserializer for PbDeserializer {
+    fn parse_messages_with_kafka_meta(
+        &mut self,
+        messages: &BinaryArray,
+        kafka_partition: &Int32Array,
+        kafka_offset: &Int64Array,
+        kafka_timestamp: &Int64Array,
+    ) -> datafusion::common::Result<RecordBatch> {
+        let mut msg_cursors = messages
+            .iter()
+            .map(|v| {
+                let s = v.expect("message bytes must not be null");
+                Cursor::new(s)
+            })
+            .collect::<Vec<_>>();
+        for (row_idx, msg_cursor) in msg_cursors.iter_mut().enumerate() {
+            while msg_cursor.has_remaining() {
+                let (tag, wired_type) = 
prost::encoding::decode_key(msg_cursor).map_err(|e| {
+                    DataFusionError::Execution(format!("Failed to parse 
protobuf key: {e}"))
+                })?;
+                if let Some(value_handler) = self.value_handlers.get_mut(&tag) 
{
+                    value_handler(msg_cursor, tag, wired_type)?;
+                }
+            }
+            let ensure_size = &mut self.ensure_size;
+            ensure_size(row_idx + 1);
+        }
+
+        let root_struct = StructArray::from({
+            RecordBatch::try_new_with_options(
+                self.pb_schema.clone(),
+                self.output_array_builders
+                    .iter()
+                    .map(|builder| builder.get_dyn_mut().finish())
+                    .collect(),
+                
&RecordBatchOptions::new().with_row_count(Some(messages.len())),
+            )?
+        });
+        let mut output_arrays: Vec<ArrayRef> = Vec::new();
+        output_arrays.push(Arc::new(kafka_partition.clone()));
+        output_arrays.push(Arc::new(kafka_offset.clone()));
+        output_arrays.push(Arc::new(kafka_timestamp.clone()));
+        for (field_idx, field) in 
self.output_schema_without_meta.fields().iter().enumerate() {
+            let array_ref: ArrayRef = get_output_array(&root_struct, 
&self.msg_mapping[field_idx])?;
+            if array_ref.null_count() == array_ref.len() {
+                output_arrays.push(new_null_array(field.data_type(), 
array_ref.len()));
+            } else {
+                output_arrays.push(
+                    datafusion_ext_commons::arrow::cast::cast(&array_ref, 
field.data_type())
+                        .expect("Failed to cast array"),
+                );
+            }
+        }
+        let batch = RecordBatch::try_new_with_options(
+            self.output_schema.clone(),
+            output_arrays,
+            &RecordBatchOptions::new().with_row_count(Some(messages.len())),
+        )?;
+        Ok(batch)
+    }
+}
+
+impl PbDeserializer {
+    pub fn new(
+        proto_desc_data: impl AsRef<[u8]>,
+        message_name: &str,
+        output_schema: SchemaRef,
+        // Protobuf data may contain deeply nested hierarchies, supporting the 
extraction of
+        // certain fields to the topmost layer of the Flink output. 
{"flink_output_col1":
+        // "pb_field1.pb_sub_field2", "flink_output_col2":
+        // "pb_field1.pb_sub_field3.pb_sub_sub_field1"}
+        nested_msg_mapping: &HashMap<String, String>,
+        skip_fields: &[String],
+    ) -> datafusion::error::Result<Self> {
+        let pool: DescriptorPool =
+            DescriptorPool::decode(proto_desc_data.as_ref()).map_err(|e| {
+                DataFusionError::Execution(format!("Failed to parse descriptor 
file: {e}"))
+            })?;
+
+        for message in pool.all_messages() {
+            if message.name() == message_name {
+                return Self::try_new(message, output_schema, 
nested_msg_mapping, skip_fields);
+            }
+        }
+        Err(DataFusionError::Execution(format!(
+            "Message '{message_name}' not found"
+        )))
+    }
+
+    pub fn try_new(
+        message_descriptor: MessageDescriptor,
+        output_schema: SchemaRef,
+        nested_msg_mapping: &HashMap<String, String>,
+        skip_fields: &[String],
+    ) -> datafusion::error::Result<Self> {
+        // The output schema includes Kafka's meta fields, but these are 
absent in the
+        // PB data, so they must be filtered out.
+        let output_schema_without_meta = Arc::new(Schema::new(
+            output_schema
+                .fields()
+                .iter()
+                .filter(|f| {
+                    f.name() != "serialized_kafka_records_partition"
+                        && f.name() != "serialized_kafka_records_offset"
+                        && f.name() != "serialized_kafka_records_timestamp"
+                })
+                .cloned()
+                .collect::<Fields>(),
+        ));
+        // Schema inferred from the PB descriptor.
+        let pb_schema = transfer_output_schema_to_pb_schema(
+            message_descriptor.clone(),
+            &output_schema_without_meta,
+            nested_msg_mapping.clone(),
+            &skip_fields,
+        )
+        .expect("Failed to transfer output scheam to pb scheam");
+
+        let tag_to_output_mapping =
+            create_tag_to_output_mapping(message_descriptor.clone(), 
&pb_schema);
+
+        let output_array_builders =
+            create_output_array_builders(&pb_schema, 
message_descriptor.clone())?;
+        let ensure_size = 
ensure_output_array_builders_size(&output_array_builders)?;
+
+        let value_handlers = message_descriptor
+            .fields()
+            .map(|field| {
+                Ok((
+                    field.number(),
+                    create_value_handler(
+                        &message_descriptor,
+                        field.number(),
+                        &tag_to_output_mapping,
+                        &pb_schema,
+                        &output_array_builders,
+                    )?,
+                ))
+            })
+            .collect::<datafusion::error::Result<hashbrown::HashMap<_, _, 
foldhash::fast::RandomState>>>()?;
+
+        // precompute message mappings
+        let msg_mapping = output_schema_without_meta
+            .fields()
+            .iter()
+            .map(|field| {
+                let mut mapped_field_indices = vec![];
+                let mut cur_fields = pb_schema.fields();
+                if let Some(nested) = nested_msg_mapping.get(field.name()) {
+                    let nested_fields = nested.split(".").collect::<Vec<_>>();
+                    for nested_field in &nested_fields[..nested_fields.len() - 
1] {
+                        match cur_fields.find(nested_field) {
+                            Some((idx, f)) => {
+                                if let DataType::Struct(fields) = 
f.data_type() {
+                                    mapped_field_indices.push(idx);
+                                    cur_fields = fields;
+                                } else {
+                                    return df_execution_err!("nested field 
must be struct");
+                                }
+                            }
+                            _ => return df_execution_err!("nested field not 
found in pb schema"),
+                        };
+                    }
+                    if let Some((idx, _)) = 
cur_fields.find(nested_fields[nested_fields.len() - 1])
+                    {
+                        mapped_field_indices.push(idx);
+                    } else {
+                        return df_execution_err!("field not found in pb 
schema");
+                    }
+                } else if let Ok(idx) = pb_schema.index_of(field.name()) {
+                    mapped_field_indices.push(idx);
+                } else {
+                    return df_execution_err!("field not found in pb schema");
+                }
+                Ok(mapped_field_indices)
+            })
+            .collect::<datafusion::error::Result<Vec<_>>>()?;
+
+        Ok(Self {
+            output_schema,
+            output_schema_without_meta,
+            pb_schema,
+            output_array_builders,
+            ensure_size,
+            value_handlers,
+            msg_mapping,
+        })
+    }
+}
+
+fn transfer_output_schema_to_pb_schema(
+    message_descriptor: MessageDescriptor,
+    output_schema: &SchemaRef,
+    nested_msg_mapping: HashMap<String, String>,
+    skip_fields: &[String],
+) -> datafusion::error::Result<SchemaRef> {
+    let mut pb_schema_fields: Vec<Field> = vec![];
+    let mut sub_pb_nested_msg_mapping: HashMap<String, String> = 
HashMap::new();
+    let mut sub_pb_schema_mapping: HashMap<String, Vec<Field>> = 
HashMap::new();
+    // To ensure sequential processing, the output schema is used to traverse 
the
+    // data.
+    for (output_index, field) in output_schema.fields().iter().enumerate() {
+        if let Some(pb_nested_msg_name) = nested_msg_mapping.get(field.name()) 
{
+            let index_start = pb_nested_msg_name.find(".");
+            if let Some(index) = index_start {
+                sub_pb_nested_msg_mapping.insert(
+                    field.name().to_string(),
+                    pb_nested_msg_name[(index + 1)..].to_string(),
+                );
+                sub_pb_schema_mapping
+                    .entry(pb_nested_msg_name[..index].to_string())
+                    .and_modify(|v| {
+                        v.push(field.as_ref().clone());
+                    })
+                    .or_insert(vec![field.as_ref().clone()]);
+            }
+        }
+    }
+    let mut msg_set: HashSet<String> = HashSet::new();
+    for (index, field) in output_schema.fields().iter().enumerate() {
+        if let Some(field_name) = nested_msg_mapping.get(field.name()) {
+            let index_start = field_name.find(".");
+            if let Some(index) = index_start {
+                let msg_field_name = &field_name[..index];
+                let msg_field_desc =
+                    message_descriptor
+                        .get_field_by_name(msg_field_name)
+                        .expect(&format!(
+                            "nested field {msg_field_name} not exits in 
message_descriptor"
+                        ));
+                if let Kind::Message(sub_message_desc) = msg_field_desc.kind() 
{
+                    if !msg_set.contains(msg_field_name) {
+                        let sub_fields = sub_pb_schema_mapping
+                            .get(msg_field_name)
+                            .ok_or_else(|| {
+                                DataFusionError::Execution(format!(
+                                    "Field {msg_field_name} not found in 
sub_pb_schema_mapping"
+                                ))
+                            })?
+                            .clone();
+                        let sub_pb_schema = 
transfer_output_schema_to_pb_schema(
+                            sub_message_desc.clone(),
+                            &Arc::new(Schema::new(sub_fields)),
+                            sub_pb_nested_msg_mapping.clone(),
+                            skip_fields,
+                        )
+                        .expect("transfer_output_schema_to_pb_schema failed");
+                        pb_schema_fields.push(Field::new(
+                            msg_field_name,
+                            DataType::Struct(sub_pb_schema.fields.clone()),
+                            true,
+                        ));
+                        msg_set.insert(msg_field_name.to_string());
+                    }
+                } else {
+                    return df_execution_err!("not message field");
+                }
+            } else {
+                let msg_field_desc =
+                    message_descriptor
+                        .get_field_by_name(field_name)
+                        .expect(&format!(
+                            "nested innermost field {field_name} not exits in 
message_descriptor"
+                        ));
+                
pb_schema_fields.push(create_arrow_field(msg_field_desc.clone(), skip_fields));
+            }
+        } else {
+            let msg_field_desc = message_descriptor
+                .get_field_by_name(field.name())
+                .expect(&format!("{} not exits in message_descriptor", 
field.name()));
+            pb_schema_fields.push(create_arrow_field(msg_field_desc.clone(), 
skip_fields));
+        }
+    }
+    Ok(Arc::new(Schema::new(pb_schema_fields)))
+}
+
+fn create_arrow_field(field_desc: FieldDescriptor, skip_fields: &[String]) -> 
Field {
+    Field::new(
+        field_desc.name(),
+        convert_pb_type_to_arrow(
+            field_desc.kind(),
+            field_desc.is_list(),
+            field_desc.is_map(),
+            field_desc.name(),
+            skip_fields,
+        )
+        .expect("convert_pb_type_to_arrow failed"),
+        true, // TODO: is_nullable
+    )
+}
+
+fn convert_pb_type_to_arrow(
+    field_kind: Kind,
+    is_list: bool,
+    is_map: bool,
+    field_name: &str,
+    skip_fields: &[String],
+) -> datafusion::error::Result<DataType> {
+    match field_kind {
+        Kind::Bool => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Boolean,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Boolean)
+            }
+        }
+        Kind::String => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Utf8,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Utf8)
+            }
+        }
+        Kind::Bytes => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Binary,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Binary)
+            }
+        }
+        Kind::Float => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Float32,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Float32)
+            }
+        }
+        Kind::Double => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Float64,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Float64)
+            }
+        }
+        Kind::Int32 => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Int32,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Int32)
+            }
+        }
+        Kind::Int64 => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Int64,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Int64)
+            }
+        }
+        Kind::Uint32 => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::UInt32,
+                    true,
+                )))
+            } else {
+                Ok(DataType::UInt32)
+            }
+        }
+        Kind::Uint64 => {
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::UInt64,
+                    true,
+                )))
+            } else {
+                Ok(DataType::UInt64)
+            }
+        }
+        Kind::Enum(enum_descriptor) => {
+            // Enum to get the Name, so use String.
+            if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Utf8,
+                    true,
+                )))
+            } else {
+                Ok(DataType::Utf8)
+            }
+        }
+        Kind::Message(message_descriptor) => {
+            if is_map {
+                Ok(DataType::Map(
+                    Arc::new(Field::new(
+                        "entries",
+                        DataType::Struct(Fields::from(
+                            message_descriptor
+                                .fields()
+                                .filter(|field| {
+                                    
!skip_fields.contains(&field.full_name().to_string())
+                                })
+                                .map(|field| create_arrow_field(field, 
skip_fields))
+                                .collect::<Vec<Field>>(),
+                        )),
+                        false,
+                    )),
+                    false,
+                ))
+            } else if is_list {
+                Ok(DataType::List(create_arrow_field_ref(
+                    field_name,
+                    DataType::Struct(Fields::from(
+                        message_descriptor
+                            .fields()
+                            .filter(|field| 
!skip_fields.contains(&field.full_name().to_string()))
+                            .map(|field| create_arrow_field(field, 
skip_fields))
+                            .collect::<Vec<Field>>(),
+                    )),
+                    true,
+                )))
+            } else {
+                Ok(DataType::Struct(Fields::from(
+                    message_descriptor
+                        .fields()
+                        .filter(|field| 
!skip_fields.contains(&field.full_name().to_string()))
+                        .map(|field| create_arrow_field(field, skip_fields))
+                        .collect::<Vec<Field>>(),
+                )))
+            }
+        }
+        other => {
+            return Err(DataFusionError::NotImplemented(format!(
+                "Unsupported data type for Arrow conversion: {other:?}"
+            )));
+        }
+    }
+}
+
+fn create_arrow_field_ref(field_name: &str, data_type: DataType, is_nullable: 
bool) -> FieldRef {
+    Arc::new(Field::new(field_name, data_type, is_nullable))
+}
+
+fn create_tag_to_output_mapping(
+    message_descriptor: MessageDescriptor,
+    output_schema: &SchemaRef,
+) -> HashMap<u32, usize> {
+    let mut tag_to_output_index = HashMap::new();
+
+    for field in message_descriptor.fields() {
+        if let Some(output_index) = output_schema
+            .fields()
+            .iter()
+            .position(|f| f.name() == field.name())
+        {
+            tag_to_output_index.insert(field.number(), output_index);
+        }
+    }
+    tag_to_output_index
+}
+
+fn create_output_array_builders(
+    schema: &SchemaRef,
+    message_descriptor: MessageDescriptor,
+) -> datafusion::error::Result<Vec<SharedArrayBuilder>> {
+    let mut array_builders: Vec<SharedArrayBuilder> = vec![];
+    for field in schema.fields() {
+        let field_name = field.name();
+        let field_desc = message_descriptor
+            .get_field_by_name(field_name)
+            .expect(&format!(
+                "Field {field_name} not exits in message_descriptor",
+            ));
+        match field.data_type() {
+            DataType::Boolean => {
+                
array_builders.push(SharedArrayBuilder::new(BooleanBuilder::new()));
+            }
+            DataType::Int32 => {
+                
array_builders.push(SharedArrayBuilder::new(Int32Builder::new()));
+            }
+            DataType::Int64 => {
+                
array_builders.push(SharedArrayBuilder::new(Int64Builder::new()));
+            }
+            DataType::Utf8 => {
+                
array_builders.push(SharedArrayBuilder::new(StringBuilder::new()));
+            }
+            DataType::Float32 => {
+                
array_builders.push(SharedArrayBuilder::new(Float32Builder::new()));
+            }
+            DataType::Float64 => {
+                
array_builders.push(SharedArrayBuilder::new(Float64Builder::new()));
+            }
+            DataType::UInt32 => {
+                
array_builders.push(SharedArrayBuilder::new(UInt32Builder::new()));
+            }
+            DataType::UInt64 => {
+                
array_builders.push(SharedArrayBuilder::new(UInt64Builder::new()));
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                
array_builders.push(SharedArrayBuilder::new(TimestampMillisecondBuilder::new()));
+            }
+            DataType::Binary => {
+                
array_builders.push(SharedArrayBuilder::new(BinaryBuilder::new()));
+            }
+            DataType::Struct(fields) => {
+                let field_kind = field_desc.kind();
+                let sub_msg_desc = field_kind.as_message().expect("as_message 
failed");
+                let struct_builder = create_output_array_builders(
+                    &Arc::new(Schema::new(fields.clone())),
+                    sub_msg_desc.clone(),
+                )
+                .expect("struct create_output_array_builders failed");
+                
array_builders.push(SharedArrayBuilder::new(SharedStructArrayBuilder::new(
+                    fields.clone(),
+                    struct_builder,
+                )));
+            }
+            DataType::Map(field_ref, boolean) => {
+                let field_kind = field_desc.kind();
+                let sub_msg_desc = field_kind.as_message().expect("map 
as_message failed");
+                if let DataType::Struct(fields) = field_ref.data_type() {
+                    
array_builders.push(SharedArrayBuilder::new(SharedMapArrayBuilder::new(
+                        None,
+                        create_shared_array_builder_by_data_type(
+                            fields.get(0).expect("get 0 
failed").data_type().clone(),
+                            sub_msg_desc.get_field(1).expect("get map key 
failed"),
+                        )
+                        .expect("map create_shared_array_builder_by_data_type 
failed"),
+                        create_shared_array_builder_by_data_type(
+                            fields.get(1).expect("get 1 
failed").data_type().clone(),
+                            sub_msg_desc.get_field(2).expect("get map key 
failed"),
+                        )
+                        .expect("map create_shared_array_builder_by_data_type 
failed"),
+                    )));
+                } else {
+                    return Err(DataFusionError::NotImplemented(format!(
+                        "Unsupported Map data type for Arrow conversion: 
{field_ref:?}"
+                    )));
+                }
+            }
+            DataType::List(field_ref) => {
+                
array_builders.push(SharedArrayBuilder::new(SharedListArrayBuilder::new(
+                    create_shared_array_builder_by_data_type(
+                        field_ref.data_type().clone(),
+                        field_desc,
+                    )
+                    .expect("List create_shared_array_builder_by_data_type 
failed"),
+                )));
+            }
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported data type for Arrow conversion: {other:?}"
+                )));
+            }
+        }
+    }
+    Ok(array_builders)
+}
+
+fn create_shared_array_builder_by_data_type(
+    data_type: DataType,
+    field_desc: FieldDescriptor,
+) -> datafusion::error::Result<SharedArrayBuilder> {
+    match data_type {
+        DataType::Boolean => {
+            return Ok(SharedArrayBuilder::new(BooleanBuilder::new()));
+        }
+        DataType::Int32 => {
+            return Ok(SharedArrayBuilder::new(Int32Builder::new()));
+        }
+        DataType::Int64 => {
+            return Ok(SharedArrayBuilder::new(Int64Builder::new()));
+        }
+        DataType::Utf8 => {
+            return Ok(SharedArrayBuilder::new(StringBuilder::new()));
+        }
+        DataType::Float32 => {
+            return Ok(SharedArrayBuilder::new(Float32Builder::new()));
+        }
+        DataType::Float64 => {
+            return Ok(SharedArrayBuilder::new(Float64Builder::new()));
+        }
+        DataType::UInt32 => {
+            return Ok(SharedArrayBuilder::new(UInt32Builder::new()));
+        }
+        DataType::UInt64 => {
+            return Ok(SharedArrayBuilder::new(UInt64Builder::new()));
+        }
+        DataType::Timestamp(TimeUnit::Millisecond, _) => {
+            return 
Ok(SharedArrayBuilder::new(TimestampMillisecondBuilder::new()));
+        }
+        DataType::Binary => {
+            return Ok(SharedArrayBuilder::new(BinaryBuilder::new()));
+        }
+        DataType::Struct(fields) => {
+            let field_kind = field_desc.kind();
+            let sub_msg_desc = field_kind.as_message().expect("as_message 
failed");
+            let struct_builder = create_output_array_builders(
+                &Arc::new(Schema::new(fields.clone())),
+                sub_msg_desc.clone(),
+            )
+            .expect("struct create_output_array_builders failed");
+            return Ok(SharedArrayBuilder::new(SharedStructArrayBuilder::new(
+                fields.clone(),
+                struct_builder,
+            )));
+        }
+        DataType::Map(field_ref, boolean) => {
+            let field_kind = field_desc.kind();
+            let sub_msg_desc = field_kind.as_message().expect("map as_message 
failed");
+            if let DataType::Struct(fields) = field_ref.data_type() {
+                return Ok(SharedArrayBuilder::new(SharedMapArrayBuilder::new(
+                    None,
+                    create_shared_array_builder_by_data_type(
+                        fields.get(0).expect("get 0 
failed").data_type().clone(),
+                        sub_msg_desc.get_field(1).expect("get map key failed"),
+                    )
+                    .expect("map create_shared_array_builder_by_data_type 
failed"),
+                    create_shared_array_builder_by_data_type(
+                        fields.get(1).expect("get 1 
failed").data_type().clone(),
+                        sub_msg_desc.get_field(2).expect("get map key failed"),
+                    )
+                    .expect("map create_shared_array_builder_by_data_type 
failed"),
+                )));
+            } else {
+                return df_execution_err!(
+                    "Map DataType Unsupported non-struct data type for Arrow 
conversion"
+                );
+            }
+        }
+        DataType::List(field_ref) => {
+            return Ok(SharedArrayBuilder::new(SharedListArrayBuilder::new(
+                
create_shared_array_builder_by_data_type(field_ref.data_type().clone(), 
field_desc)
+                    .expect("List create_shared_array_builder_by_data_type 
failed"),
+            )));
+        }
+        other => return df_execution_err!("Unsupported data type for Arrow 
conversion: {other:?}"),
+    }
+}
+
+pub(crate) fn ensure_output_array_builders_size(
+    builders: &[SharedArrayBuilder],
+) -> datafusion::error::Result<Box<dyn FnMut(usize) + Send + Sync>> {
+    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
+    enum BuilderType {
+        Boolean,
+        Int32,
+        Int64,
+        UInt32,
+        UInt64,
+        String,
+        Float32,
+        Float64,
+        TimestampMillisecond,
+        Binary,
+        SharedArrayStruct,
+        SharedArrayList,
+        SharedArrayMap,
+    }
+    let mut classified_builders = HashMap::<BuilderType, 
Vec<SharedArrayBuilder>>::new();
+    let mut processing_builders = builders.to_vec();
+    while let Some(builder) = processing_builders.pop() {
+        if let Ok(_) = builder.get_mut::<BooleanBuilder>() {
+            classified_builders
+                .entry(BuilderType::Boolean)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<Int32Builder>() {
+            classified_builders
+                .entry(BuilderType::Int32)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<Int64Builder>() {
+            classified_builders
+                .entry(BuilderType::Int64)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<UInt32Builder>() {
+            classified_builders
+                .entry(BuilderType::UInt32)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<UInt64Builder>() {
+            classified_builders
+                .entry(BuilderType::UInt64)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<StringBuilder>() {
+            classified_builders
+                .entry(BuilderType::String)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<Float32Builder>() {
+            classified_builders
+                .entry(BuilderType::Float32)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<Float64Builder>() {
+            classified_builders
+                .entry(BuilderType::Float64)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<TimestampMillisecondBuilder>() 
{
+            classified_builders
+                .entry(BuilderType::TimestampMillisecond)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<BinaryBuilder>() {
+            classified_builders
+                .entry(BuilderType::Binary)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(struct_builder) = 
builder.get_mut::<SharedStructArrayBuilder>() {
+            classified_builders
+                .entry(BuilderType::SharedArrayStruct)
+                .or_default()
+                .push(builder.clone());
+            
processing_builders.extend(struct_builder.get_mut().get_field_builders().clone());
+        } else if let Ok(_) = builder.get_mut::<SharedListArrayBuilder>() {
+            classified_builders
+                .entry(BuilderType::SharedArrayList)
+                .or_default()
+                .push(builder.clone());
+        } else if let Ok(_) = builder.get_mut::<SharedMapArrayBuilder>() {
+            classified_builders
+                .entry(BuilderType::SharedArrayMap)
+                .or_default()
+                .push(builder.clone());
+        } else {
+            return Err(DataFusionError::NotImplemented(format!(
+                "Unsupported data type for Arrow conversion in ensure_size: 
{:?}",
+                builder.type_id()
+            )));
+        }
+    }
+
+    macro_rules! impl_for_builders {
+        ($builder_type:ty, $builders:expr, $append_fn:expr) => {{
+            let builders = $builders
+                .into_iter()
+                .map(|builder| builder.get_mut::<$builder_type>())
+                .collect::<datafusion::error::Result<Vec<_>>>()?;
+            Box::new(move |size| {
+                for builder in &builders {
+                    let builder = builder.get_mut();
+                    if builder.len() < size {
+                        fn wrap(append_fn: impl Fn(&mut $builder_type), b: 
&mut $builder_type) {
+                            append_fn(b);
+                        }
+                        wrap($append_fn, builder);
+                    }
+                }
+            }) as Box<dyn FnMut(usize) + Send + Sync>
+        }};
+    }
+
+    let mut adaptive_append_nulls = classified_builders
+        .into_iter()
+        .map(|(builder_type, builders)| {
+            Ok(match builder_type {
+                BuilderType::Boolean => {
+                    impl_for_builders!(BooleanBuilder, builders, |b| 
b.append_null())
+                }
+                BuilderType::Int32 => {
+                    impl_for_builders!(Int32Builder, builders, |b| 
b.append_value(0))
+                }
+                BuilderType::Int64 => {
+                    impl_for_builders!(Int64Builder, builders, |b| 
b.append_value(0))
+                }
+                BuilderType::UInt32 => {
+                    impl_for_builders!(UInt32Builder, builders, |b| 
b.append_value(0))
+                }
+                BuilderType::UInt64 => {
+                    impl_for_builders!(UInt64Builder, builders, |b| 
b.append_value(0))
+                }
+                BuilderType::String => {
+                    impl_for_builders!(StringBuilder, builders, |b| 
b.append_value(""))
+                }
+                BuilderType::Float32 => {
+                    impl_for_builders!(Float32Builder, builders, |b| 
b.append_value(0.0))
+                }
+                BuilderType::Float64 => {
+                    impl_for_builders!(Float64Builder, builders, |b| 
b.append_value(0.0))
+                }
+                BuilderType::TimestampMillisecond => {
+                    impl_for_builders!(TimestampMillisecondBuilder, builders, 
|b| b.append_null())
+                }
+                BuilderType::Binary => {
+                    impl_for_builders!(BinaryBuilder, builders, |b| 
b.append_value(b""))
+                }
+                BuilderType::SharedArrayStruct => {
+                    impl_for_builders!(SharedStructArrayBuilder, builders, |b| 
b.append(false))
+                }
+                BuilderType::SharedArrayList => {
+                    impl_for_builders!(SharedListArrayBuilder, builders, |b| 
b.append(true))
+                }
+                BuilderType::SharedArrayMap => {
+                    impl_for_builders!(SharedMapArrayBuilder, builders, |b| 
b.append(true))
+                }
+            })
+        })
+        .collect::<datafusion::error::Result<Vec<_>>>()?;
+
+    Ok(Box::new(move |size| {
+        adaptive_append_nulls.iter_mut().for_each(|imp| {
+            imp(size);
+        })
+    }))
+}
+
+fn get_output_array(
+    struct_array: &StructArray,
+    nested_field_name: &[usize],
+) -> datafusion::error::Result<ArrayRef> {
+    let column = struct_array.column(nested_field_name[0]);
+    if nested_field_name.len() > 1 {
+        return get_output_array(downcast_any!(column, StructArray)?, 
&nested_field_name[1..]);
+    }
+    Ok(column.clone())
+}
+
+fn create_value_handler(
+    message_descriptor: &MessageDescriptor,
+    tag_id: u32,
+    tag_to_output_index: &HashMap<u32, usize>,
+    pb_schema: &SchemaRef,
+    output_array_builders: &[SharedArrayBuilder],
+) -> datafusion::error::Result<ValueHandler> {
+    let output_index = tag_to_output_index.get(&tag_id);
+    let field = message_descriptor.get_field(tag_id);
+
+    if let Some((field, &output_index)) = field.clone().zip(output_index) {
+        let output_array_builder = output_array_builders[output_index].clone();
+        let output_field = pb_schema.field(output_index);
+
+        macro_rules! impl_for_builder {
+            ($encoding_tyname:ident, $handle_fn:expr) => {{
+                Box::new(move |cursor, tag, wire_type| {
+                    let merge_method = 
prost::encoding::$encoding_tyname::merge;
+                    let mut value = Default::default();
+                    merge_method(wire_type, &mut value, cursor, 
DecodeContext::default()).map_err(
+                        |e| {
+                            DataFusionError::Execution(format!(
+                                "Failed to decode {:?} [{}] and {} field: {}",
+                                wire_type,
+                                tag,
+                                stringify!($encoding_tyname),
+                                e
+                            ))
+                        },
+                    )?;
+                    $handle_fn(&value);
+                    Ok(())
+                })
+            }};
+        }
+
+        macro_rules! impl_for_bytes_builder {
+            ($encoding_tyname:ident, $handle_fn:expr) => {{
+                Box::new(move |cursor: &mut Cursor<&[u8]>, _tag, wire_type| {
+                    
prost::encoding::check_wire_type(WireType::LengthDelimited, wire_type)
+                        .or_else(|err| df_execution_err!("{err}"))?;
+                    let len = prost::encoding::decode_varint(cursor)
+                        .or_else(|err| df_execution_err!("{err}"))?;
+                    if len > cursor.remaining() as u64 {
+                        return df_execution_err!("buffer underflow");
+                    }
+                    let value = &cursor.get_mut()[cursor.position() as 
usize..][..len as usize];
+                    $handle_fn(value);
+                    cursor.advance(len as usize);
+                    Ok(())
+                })
+            }};
+        }
+
+        macro_rules! impl_for_repeated_builder {
+            ($encoding_tyname:ident, $handle_fn:expr) => {{
+                Box::new(move |cursor, tag, wire_type| {
+                    let merge_method = 
prost::encoding::$encoding_tyname::merge_repeated;
+                    let value = UnsafeCell::new(Default::default());
+                    merge_method(
+                        wire_type,
+                        unsafe { &mut *value.get() },
+                        cursor,
+                        DecodeContext::default(),
+                    )
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "Failed to decode repeated {:?} [{}] and {} field: 
{}",
+                            wire_type,
+                            tag,
+                            stringify!($encoding_tyname),
+                            e
+                        ))
+                    })?;
+                    $handle_fn(unsafe { &*value.get() });
+                    unsafe { &mut *value.get() }.clear();
+                    Ok(())
+                })
+            }};
+        }
+
+        macro_rules! impl_for_message_builder {
+            ($handle_fn:expr) => {{
+                Box::new(move |cursor: &mut Cursor<&[u8]>, tag, wire_type| {
+                    
prost::encoding::check_wire_type(WireType::LengthDelimited, wire_type)
+                        .or_else(|err| df_execution_err!("{err}"))?;
+                    let len = prost::encoding::decode_varint(cursor)
+                        .or_else(|err| df_execution_err!("{err}"))?;
+                    if len > cursor.remaining() as u64 {
+                        return df_execution_err!("buffer underflow");
+                    }
+
+                    $handle_fn(&cursor.get_mut()[cursor.position() as 
usize..][..len as usize]);
+                    cursor.advance(len as usize);
+                    Ok(())
+                })
+            }};
+        }
+
+        match field.kind() {
+            Kind::Bool => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<BooleanBuilder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(bool, |values: 
&Vec<bool>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(bool, |value: &bool| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<BooleanBuilder>()?;
+                    return Ok(impl_for_builder!(bool, |value: &bool| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Int32 => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<Int32Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(int32, |values: 
&Vec<i32>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(int32, |value: &i32| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<Int32Builder>()?;
+                    return Ok(impl_for_builder!(int32, |value: &i32| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Int64 => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<Int64Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(int64, |values: 
&Vec<i64>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(int64, |value: &i64| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<Int64Builder>()?;
+                    return Ok(impl_for_builder!(int64, |value: &i64| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::String => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<StringBuilder>()?;
+                    return Ok(impl_for_bytes_builder!(string, |value: &[u8]| {
+                        let s = unsafe { str::from_utf8_unchecked(value) };
+                        array_builder.get_mut().append_value(s);
+                    }));
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<StringBuilder>()?;
+                    return Ok(impl_for_bytes_builder!(string, |value: &[u8]| {
+                        let s = unsafe { str::from_utf8_unchecked(value) };
+                        array_builder.get_mut().append_value(s);
+                    }));
+                }
+            }
+            Kind::Float => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<Float32Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(float, |values: 
&Vec<f32>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(float, |value: &f32| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<Float32Builder>()?;
+                    return Ok(impl_for_builder!(float, |value: &f32| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Double => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<Float64Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(double, |values: 
&Vec<f64>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(double, |value: &f64| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<Float64Builder>()?;
+                    return Ok(impl_for_builder!(double, |value: &f64| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Uint32 => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<UInt32Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(uint32, |values: 
&Vec<u32>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(uint32, |value: &u32| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<UInt32Builder>()?;
+                    return Ok(impl_for_builder!(uint32, |value: &u32| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Uint64 => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<UInt64Builder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(uint64, |values: 
&Vec<u64>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(*value);
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(uint64, |value: &u64| {
+                            array_builder.get_mut().append_value(*value);
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<UInt64Builder>()?;
+                    return Ok(impl_for_builder!(uint64, |value: &u64| {
+                        array_builder.get_mut().append_value(*value);
+                    }));
+                }
+            }
+            Kind::Enum(enum_descriptor) => {
+                let mut enum_string_mapping = HashMap::new();
+                for enum_value_descriptor in enum_descriptor.values() {
+                    enum_string_mapping.insert(
+                        enum_value_descriptor.number(),
+                        
get_content_after_last_dot(enum_value_descriptor.name()).to_string(),
+                    );
+                }
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<StringBuilder>()?;
+                    if field.is_packed() {
+                        return Ok(impl_for_repeated_builder!(int32, |values: 
&Vec<i32>| {
+                            for value in values {
+                                array_builder.get_mut().append_value(
+                                    enum_string_mapping
+                                        .get(value)
+                                        .map_or("Unknown", |v| v.as_str()),
+                                );
+                            }
+                        }));
+                    } else {
+                        return Ok(impl_for_builder!(int32, |value: &i32| {
+                            array_builder.get_mut().append_value(
+                                enum_string_mapping
+                                    .get(value)
+                                    .map_or("Unknown", |v| v.as_str()),
+                            );
+                        }));
+                    }
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<StringBuilder>()?;
+                    return Ok(impl_for_builder!(int32, |value: &i32| {
+                        array_builder.get_mut().append_value(
+                            enum_string_mapping
+                                .get(value)
+                                .map_or("Unknown", |v| v.as_str()),
+                        );
+                    }));
+                }
+            }
+            Kind::Message(sub_message_descriptor) => {
+                if let DataType::Struct(sub_fields) = output_field.data_type() 
{
+                    let sub_pb_schema = 
Arc::new(Schema::new(sub_fields.clone()));
+                    let sub_tag_to_output_mapping = 
create_tag_to_output_mapping(
+                        sub_message_descriptor.clone(),
+                        &sub_pb_schema,
+                    );
+                    let sub_output_array_builders = output_array_builder
+                        .get_mut::<SharedStructArrayBuilder>()
+                        .expect("SharedStructArrayBuilder is null")
+                        .get_mut()
+                        .get_field_builders();
+                    let mut sub_value_handlers: ValueHandlerMap = 
Default::default();
+                    for field in sub_message_descriptor.fields() {
+                        if let Ok(handler) = create_value_handler(
+                            &sub_message_descriptor,
+                            field.number(),
+                            &sub_tag_to_output_mapping,
+                            &sub_pb_schema,
+                            &sub_output_array_builders,
+                        ) {
+                            sub_value_handlers.insert(field.number(), handler);
+                        } else {
+                            return df_execution_err!(
+                                "Failed to create value handler for sub field: 
{:?}, {}",
+                                field.kind(),
+                                output_field.data_type()
+                            );
+                        }
+                    }
+
+                    let struct_builder = output_array_builder
+                        .get_mut::<SharedStructArrayBuilder>()
+                        .expect("SharedStructArrayBuilder is null");
+
+                    return Ok(impl_for_message_builder!(|buf: &[u8]| {
+                        if buf.is_empty() {
+                            struct_builder.get_mut().append(false);
+                        } else {
+                            let mut sub_cursor = Cursor::new(buf);
+                            while sub_cursor.has_remaining() {
+                                if let Ok((sub_tag, sub_wire_type)) =
+                                    prost::encoding::decode_key(&mut 
sub_cursor)
+                                {
+                                    if let Some(sub_value_handler) =
+                                        sub_value_handlers.get(&sub_tag)
+                                    {
+                                        (*sub_value_handler)(
+                                            &mut sub_cursor,
+                                            sub_tag,
+                                            sub_wire_type,
+                                        )
+                                        .expect("Failed to process sub field");
+                                    }
+                                }
+                            }
+                            struct_builder.get_mut().append(true);
+                        }
+                    }));
+                } else if let DataType::List(struct_fields) = 
output_field.data_type() {
+                    if let DataType::Struct(sub_fields) = 
struct_fields.data_type() {
+                        let sub_pb_schema = 
Arc::new(Schema::new(sub_fields.clone()));
+                        let sub_tag_to_output_mapping = 
create_tag_to_output_mapping(
+                            sub_message_descriptor.clone(),
+                            &sub_pb_schema,
+                        );
+
+                        let sub_output_array_builders = output_array_builder
+                            .get_mut::<SharedListArrayBuilder>()
+                            .expect("SharedListArrayBuilder is null")
+                            .get_mut()
+                            .values()
+                            .get_mut::<SharedStructArrayBuilder>()
+                            .expect("SharedStructArrayBuilder is null")
+                            .get_mut()
+                            .get_field_builders();
+                        let mut sub_value_handlers: ValueHandlerMap = 
Default::default();
+                        for field in sub_message_descriptor.fields() {
+                            if let Ok(handler) = create_value_handler(
+                                &sub_message_descriptor,
+                                field.number(),
+                                &sub_tag_to_output_mapping,
+                                &sub_pb_schema,
+                                &sub_output_array_builders,
+                            ) {
+                                sub_value_handlers.insert(field.number(), 
handler);
+                            } else {
+                                return df_execution_err!(
+                                    "For List Struct Failed to create value 
handler for sub field: {:?}, {}",
+                                    field.kind(),
+                                    output_field.data_type()
+                                );
+                            }
+                        }
+                        return Ok(impl_for_message_builder!(|buf: &[u8]| {
+                            let struct_builder = output_array_builder
+                                .get_mut::<SharedListArrayBuilder>()
+                                .expect("SharedListArrayBuilder is null")
+                                .get_mut()
+                                .values()
+                                .get_mut::<SharedStructArrayBuilder>()
+                                .expect("SharedStructArrayBuilder is null");
+                            if buf.is_empty() {
+                                struct_builder.get_mut().append(false);
+                            } else {
+                                // 解析嵌套的 message
+                                let mut sub_cursor = Cursor::new(buf);
+                                while sub_cursor.has_remaining() {
+                                    if let Ok((sub_tag, sub_wire_type)) =
+                                        prost::encoding::decode_key(&mut 
sub_cursor)
+                                    {
+                                        if let Some(sub_value_handler) =
+                                            sub_value_handlers.get(&sub_tag)
+                                        {
+                                            (*sub_value_handler)(
+                                                &mut sub_cursor,
+                                                sub_tag,
+                                                sub_wire_type,
+                                            )
+                                            .expect("Failed to process sub 
field");
+                                        }
+                                    }
+                                }
+                                struct_builder.get_mut().append(true);
+                            }
+                        }));
+                    } else {
+                        return Err(DataFusionError::Execution(format!(
+                            "For List Struct Failed to create value handler 
field is not struct: {:?}, {}",
+                            field.kind(),
+                            output_field.data_type()
+                        )));
+                    }
+                } else if let DataType::Map(struct_fields, boolean) = 
output_field.data_type() {
+                    if let DataType::Struct(sub_fields) = 
struct_fields.data_type() {
+                        let sub_pb_schema = 
Arc::new(Schema::new(sub_fields.clone()));
+                        let sub_tag_to_output_mapping = 
create_tag_to_output_mapping(
+                            sub_message_descriptor.clone(),
+                            &sub_pb_schema,
+                        );
+                        let mut sub_value_handlers: ValueHandlerMap = 
Default::default();
+                        let map_builder = output_array_builder
+                            .get_mut::<SharedMapArrayBuilder>()
+                            .expect("SharedMapArrayBuilder is null");
+                        let map_key_value_builder = 
map_builder.get_mut().entries();
+                        let sub_output_array_builders = vec![
+                            map_key_value_builder.0.clone(),
+                            map_key_value_builder.1.clone(),
+                        ];
+                        for field in sub_message_descriptor.fields() {
+                            if let Ok(handler) = create_value_handler(
+                                &sub_message_descriptor,
+                                field.number(),
+                                &sub_tag_to_output_mapping,
+                                &sub_pb_schema,
+                                &sub_output_array_builders,
+                            ) {
+                                sub_value_handlers.insert(field.number(), 
handler);
+                            } else {
+                                return df_execution_err!(
+                                    "Failed to create value handler for sub 
field: {:?}, {}",
+                                    field.kind(),
+                                    output_field.data_type()
+                                );
+                            }
+                        }
+                        let map_builder = output_array_builder
+                            .get_mut::<SharedMapArrayBuilder>()
+                            .expect("SharedMapArrayBuilder is null");
+
+                        return Ok(impl_for_message_builder!(|buf: &[u8]| {
+                            if buf.is_empty() {
+                                map_builder.get_mut().append(true);
+                            } else {
+                                let mut sub_cursor = Cursor::new(buf);
+                                while sub_cursor.has_remaining() {
+                                    if let Ok((sub_tag, sub_wire_type)) =
+                                        prost::encoding::decode_key(&mut 
sub_cursor)
+                                    {
+                                        if let Some(sub_value_handler) =
+                                            sub_value_handlers.get(&sub_tag)
+                                        {
+                                            (*sub_value_handler)(
+                                                &mut sub_cursor,
+                                                sub_tag,
+                                                sub_wire_type,
+                                            )
+                                            .expect("Failed to process sub 
field");
+                                        }
+                                    }
+                                }
+                            }
+                        }));
+                    } else {
+                        return Err(DataFusionError::Execution(format!(
+                            "For Map Failed to create value handler field is 
not struct: {:?}, {}",
+                            field.kind(),
+                            output_field.data_type()
+                        )));
+                    }
+                } else {
+                    return Err(DataFusionError::Execution(format!(
+                        "Failed to create value handler field is not struct: 
{:?}, {}",
+                        field.kind(),
+                        output_field.data_type()
+                    )));
+                }
+            }
+            Kind::Bytes => {
+                if field.is_list() {
+                    let array_builder = output_array_builder
+                        .get_mut::<SharedListArrayBuilder>()
+                        .expect("SharedListArrayBuilder is null")
+                        .get_mut()
+                        .values()
+                        .get_mut::<BinaryBuilder>()?;
+                    return Ok(impl_for_builder!(bytes, |value: &Vec<u8>| {
+                        array_builder.get_mut().append_value(value);
+                    }));
+                } else {
+                    let array_builder = 
output_array_builder.get_mut::<BinaryBuilder>()?;
+                    return Ok(impl_for_builder!(bytes, |value: &Vec<u8>| {
+                        array_builder.get_mut().append_value(value);
+                    }));
+                }
+            }
+            other => {
+                return Err(DataFusionError::Execution(format!(
+                    "Failed to create value handler field: {:?}, {}",
+                    field.kind(),
+                    output_field.data_type()
+                )));
+            }
+        }
+    }
+
+    Ok(Box::new(|cursor, tag, wire_type| {
+        let mut skip_value = move || {
+            match wire_type {
+                WireType::Varint => {
+                    prost::encoding::decode_varint(cursor)?;
+                }
+                WireType::ThirtyTwoBit => {
+                    if cursor.remaining() < 4 {
+                        return Err(DecodeError::new("buffer underflow"));
+                    }
+                    cursor.advance(4);
+                }
+                WireType::SixtyFourBit => {
+                    if cursor.remaining() < 8 {
+                        return Err(DecodeError::new("buffer underflow"));
+                    }
+                    cursor.advance(8);
+                }
+                WireType::LengthDelimited => {
+                    let len = prost::encoding::decode_varint(cursor)? as usize;
+                    if cursor.remaining() < len {
+                        return Err(DecodeError::new("buffer underflow"));
+                    }
+                    cursor.advance(len);
+                }
+                _ => {
+                    UnknownField::decode_value(tag, wire_type, cursor, 
DecodeContext::default())?;
+                }
+            }
+            Ok(())
+        };
+
+        skip_value()
+            .map_err(|e| DataFusionError::Execution(format!("Failed to decode 
unknown value: {e}")))
+    }))
+}
+
+fn get_content_after_last_dot(s: &str) -> &str {
+    match s.rfind('.') {
+        Some(index) => &s[index + 1..],
+        None => s,
+    }
+}
+
+pub(crate) fn adaptive_append_children(
+    builder: &SharedArrayBuilder,
+) -> Option<Box<dyn FnMut(usize) + Send + Sync>> {
+    let mut appender = None;
+    if let Ok(builder) = builder.get_mut::<SharedStructArrayBuilder>() {
+        let ensure_size =
+            
ensure_output_array_builders_size(&builder.get_mut().get_field_builders())
+                .expect("ensure_output_array_builders_size failed");
+        appender = Some(ensure_size);
+    } else if let Ok(builder) = builder.get_mut::<SharedListArrayBuilder>() {
+        let f: Box<dyn FnMut(usize) + Send + Sync> =
+            Box::new(move |_| builder.get_mut().adaptive_append());
+        appender = Some(f);
+    } else if let Ok(builder) = builder.get_mut::<SharedMapArrayBuilder>() {
+        let f: Box<dyn FnMut(usize) + Send + Sync> =
+            Box::new(move |_| builder.get_mut().adaptive_append());
+        appender = Some(f);
+    }
+    appender
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{collections::HashMap, sync::Arc};
+
+    use arrow::{
+        array::*,
+        datatypes::{DataType, Field, Schema},
+    };
+    use prost::Message as ProstMessage;
+    use prost_reflect::prost_types::{DescriptorProto, FileDescriptorProto, 
FileDescriptorSet};
+    use prost_types::{
+        FieldDescriptorProto,
+        field_descriptor_proto::{Label, Type},
+    };
+
+    use super::*;
+
+    fn create_test_descriptor() -> Vec<u8> {
+        let field_descriptors = vec![
+            // int32 id = 1;
+            FieldDescriptorProto {
+                name: Some("id".to_string()),
+                number: Some(1),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::Int32 as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("id".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+            // string name = 2;
+            FieldDescriptorProto {
+                name: Some("name".to_string()),
+                number: Some(2),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::String as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("name".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+            // double score = 3;
+            FieldDescriptorProto {
+                name: Some("score".to_string()),
+                number: Some(3),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::Double as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("score".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+            // bool active = 4;
+            FieldDescriptorProto {
+                name: Some("active".to_string()),
+                number: Some(4),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::Bool as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("active".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+        ];
+
+        let message_descriptor = DescriptorProto {
+            name: Some("TestMessage".to_string()),
+            field: field_descriptors,
+            extension: vec![],
+            nested_type: vec![],
+            enum_type: vec![],
+            extension_range: vec![],
+            oneof_decl: vec![],
+            options: None,
+            reserved_range: vec![],
+            reserved_name: vec![],
+        };
+
+        let file_descriptor = FileDescriptorProto {
+            name: Some("test.proto".to_string()),
+            package: Some("test".to_string()),
+            dependency: vec![],
+            public_dependency: vec![],
+            weak_dependency: vec![],
+            message_type: vec![message_descriptor],
+            enum_type: vec![],
+            service: vec![],
+            extension: vec![],
+            options: None,
+            source_code_info: None,
+            syntax: Some("proto3".to_string()),
+        };
+
+        let descriptor_set = FileDescriptorSet {
+            file: vec![file_descriptor],
+        };
+
+        let mut buf = Vec::new();
+        descriptor_set
+            .encode(&mut buf)
+            .expect("Failed to encode FileDescriptorSet");
+        buf
+    }
+
+    fn create_nested_test_descriptor() -> Vec<u8> {
+        let address_fields = vec![
+            // string street = 1;
+            FieldDescriptorProto {
+                name: Some("street".to_string()),
+                number: Some(1),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::String as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("street".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+            // string city = 2;
+            FieldDescriptorProto {
+                name: Some("city".to_string()),
+                number: Some(2),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::String as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("city".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+        ];
+
+        let address_descriptor = DescriptorProto {
+            name: Some("Address".to_string()),
+            field: address_fields,
+            extension: vec![],
+            nested_type: vec![],
+            enum_type: vec![],
+            extension_range: vec![],
+            oneof_decl: vec![],
+            options: None,
+            reserved_range: vec![],
+            reserved_name: vec![],
+        };
+
+        let person_fields = vec![
+            // string name = 1;
+            FieldDescriptorProto {
+                name: Some("name".to_string()),
+                number: Some(1),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::String as i32),
+                type_name: None,
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("name".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+            // Address address = 2;
+            FieldDescriptorProto {
+                name: Some("address".to_string()),
+                number: Some(2),
+                label: Some(Label::Optional as i32),
+                r#type: Some(Type::Message as i32),
+                type_name: Some(".test.Address".to_string()),
+                extendee: None,
+                default_value: None,
+                oneof_index: None,
+                json_name: Some("address".to_string()),
+                options: None,
+                proto3_optional: None,
+            },
+        ];
+
+        let person_descriptor = DescriptorProto {
+            name: Some("Person".to_string()),
+            field: person_fields,
+            extension: vec![],
+            nested_type: vec![],
+            enum_type: vec![],
+            extension_range: vec![],
+            oneof_decl: vec![],
+            options: None,
+            reserved_range: vec![],
+            reserved_name: vec![],
+        };
+
+        let file_descriptor = FileDescriptorProto {
+            name: Some("nested_test.proto".to_string()),
+            package: Some("test".to_string()),
+            dependency: vec![],
+            public_dependency: vec![],
+            weak_dependency: vec![],
+            message_type: vec![address_descriptor, person_descriptor],
+            enum_type: vec![],
+            service: vec![],
+            extension: vec![],
+            options: None,
+            source_code_info: None,
+            syntax: Some("proto3".to_string()),
+        };
+
+        let descriptor_set = FileDescriptorSet {
+            file: vec![file_descriptor],
+        };
+
+        let mut buf = Vec::new();
+        descriptor_set
+            .encode(&mut buf)
+            .expect("Failed to encode FileDescriptorSet");
+        buf
+    }
+
+    fn create_test_message(id: i32, name: &str, score: f64, active: bool) -> 
Vec<u8> {
+        use prost::encoding::*;
+
+        let mut buf = Vec::new();
+
+        // id (field 1, int32)
+        encode_key(1, WireType::Varint, &mut buf);
+        encode_varint(id as u64, &mut buf);
+
+        // name (field 2, string)
+        encode_key(2, WireType::LengthDelimited, &mut buf);
+        encode_varint(name.len() as u64, &mut buf);
+        buf.extend_from_slice(name.as_bytes());
+
+        // score (field 3, double)
+        encode_key(3, WireType::SixtyFourBit, &mut buf);
+        buf.extend_from_slice(&score.to_le_bytes());
+
+        // active (field 4, bool)
+        encode_key(4, WireType::Varint, &mut buf);
+        encode_varint(active as u64, &mut buf);
+
+        buf
+    }
+
+    fn create_nested_test_message(name: &str, street: &str, city: &str) -> 
Vec<u8> {
+        use prost::encoding::*;
+
+        let mut buf = Vec::new();
+
+        // name (field 1, string)
+        encode_key(1, WireType::LengthDelimited, &mut buf);
+        encode_varint(name.len() as u64, &mut buf);
+        buf.extend_from_slice(name.as_bytes());
+
+        // address (field 2, message)
+        let mut address_buf = Vec::new();
+
+        // address.street (field 1, string)
+        encode_key(1, WireType::LengthDelimited, &mut address_buf);
+        encode_varint(street.len() as u64, &mut address_buf);
+        address_buf.extend_from_slice(street.as_bytes());
+
+        encode_key(2, WireType::LengthDelimited, &mut address_buf);
+        encode_varint(city.len() as u64, &mut address_buf);
+        address_buf.extend_from_slice(city.as_bytes());
+
+        encode_key(2, WireType::LengthDelimited, &mut buf);
+        encode_varint(address_buf.len() as u64, &mut buf);
+        buf.extend_from_slice(&address_buf);
+
+        buf
+    }
+
+    fn create_binary_array(messages: Vec<Vec<u8>>) -> BinaryArray {
+        let mut builder = BinaryBuilder::new();
+        for msg in messages {
+            builder.append_value(&msg);
+        }
+        builder.finish()
+    }
+
+    fn create_partition_array(partitions: Vec<i32>) -> Int32Array {
+        Int32Array::from(partitions)
+    }
+
+    fn create_offset_array(offsets: Vec<i64>) -> Int64Array {
+        Int64Array::from(offsets)
+    }
+
+    fn create_timestamp_array(timestamps: Vec<i64>) -> Int64Array {
+        Int64Array::from(timestamps)
+    }
+
+    #[test]
+    fn test_parse_messages_with_kafka_meta_basic() {
+        let descriptor_data = create_test_descriptor();
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("serialized_kafka_records_partition", DataType::Int32, 
false),
+            Field::new("serialized_kafka_records_offset", DataType::Int64, 
false),
+            Field::new("serialized_kafka_records_timestamp", DataType::Int64, 
false),
+            Field::new("id", DataType::Int32, true),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("score", DataType::Float64, true),
+            Field::new("active", DataType::Boolean, true),
+        ]));
+
+        let mut deserializer = PbDeserializer::new(
+            descriptor_data,
+            "TestMessage", // 使用简短名称
+            schema.clone(),
+            &HashMap::new(),
+            &[],
+        )
+        .expect("Failed to create deserializer");
+
+        let messages = create_binary_array(vec![
+            create_test_message(1, "Alice", 95.5, true),
+            create_test_message(2, "Bob", 87.3, false),
+            create_test_message(3, "Charlie", 92.1, true),
+        ]);
+
+        let partitions = create_partition_array(vec![0, 0, 1]);
+        let offsets = create_offset_array(vec![100, 101, 50]);
+        let timestamps = create_timestamp_array(vec![1234567890000, 
1234567891000, 1234567892000]);
+
+        let batch = deserializer
+            .parse_messages_with_kafka_meta(&messages, &partitions, &offsets, 
&timestamps)
+            .expect("Failed to deserialize");
+
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(batch.num_columns(), 7);
+
+        let partition_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast partition array to Int32Array");
+        assert_eq!(partition_array.value(0), 0);
+        assert_eq!(partition_array.value(1), 0);
+        assert_eq!(partition_array.value(2), 1);
+
+        let offset_array = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast offset array to Int64Array");
+        assert_eq!(offset_array.value(0), 100);
+        assert_eq!(offset_array.value(1), 101);
+        assert_eq!(offset_array.value(2), 50);
+
+        let timestamp_array = batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast timestamp array to Int64Array");
+        assert_eq!(timestamp_array.value(0), 1234567890000);
+        assert_eq!(timestamp_array.value(1), 1234567891000);
+        assert_eq!(timestamp_array.value(2), 1234567892000);
+
+        let id_array = batch
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast id array to Int32Array");
+        assert_eq!(id_array.value(0), 1);
+        assert_eq!(id_array.value(1), 2);
+        assert_eq!(id_array.value(2), 3);
+
+        let name_array = batch
+            .column(4)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast name array to StringArray");
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+        assert_eq!(name_array.value(2), "Charlie");
+
+        let score_array = batch
+            .column(5)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("Failed to downcast score array to Float64Array");
+        assert_eq!(score_array.value(0), 95.5);
+        assert_eq!(score_array.value(1), 87.3);
+        assert_eq!(score_array.value(2), 92.1);
+
+        let active_array = batch
+            .column(6)
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .expect("Failed to downcast active array to BooleanArray");
+        assert!(active_array.value(0));
+        assert!(!active_array.value(1));
+        assert!(active_array.value(2));
+    }
+
+    #[test]
+    fn test_parse_messages_with_kafka_meta_nested() {
+        let descriptor_data = create_nested_test_descriptor();
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("serialized_kafka_records_partition", DataType::Int32, 
false),
+            Field::new("serialized_kafka_records_offset", DataType::Int64, 
false),
+            Field::new("serialized_kafka_records_timestamp", DataType::Int64, 
false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("street", DataType::Utf8, true),
+            Field::new("city", DataType::Utf8, true),
+        ]));
+
+        let mut nested_mapping = HashMap::new();
+        nested_mapping.insert("street".to_string(), 
"address.street".to_string());
+        nested_mapping.insert("city".to_string(), "address.city".to_string());
+
+        let mut deserializer = PbDeserializer::new(
+            descriptor_data,
+            "Person",
+            schema.clone(),
+            &nested_mapping,
+            &[],
+        )
+        .expect("Failed to create deserializer");
+
+        let messages = create_binary_array(vec![
+            create_nested_test_message("Alice", "123 Main St", "New York"),
+            create_nested_test_message("Bob", "456 Oak Ave", "Los Angeles"),
+        ]);
+
+        let partitions = create_partition_array(vec![0, 1]);
+        let offsets = create_offset_array(vec![200, 150]);
+        let timestamps = create_timestamp_array(vec![1234567893000, 
1234567894000]);
+
+        let batch = deserializer
+            .parse_messages_with_kafka_meta(&messages, &partitions, &offsets, 
&timestamps)
+            .expect("Failed to deserialize");
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 6);
+
+        let partition_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast partition array to Int32Array");
+        assert_eq!(partition_array.value(0), 0);
+        assert_eq!(partition_array.value(1), 1);
+
+        let name_array = batch
+            .column(3)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast name array to StringArray");
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+
+        let street_array = batch
+            .column(4)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast street array to StringArray");
+        assert_eq!(street_array.value(0), "123 Main St");
+        assert_eq!(street_array.value(1), "456 Oak Ave");
+
+        let city_array = batch
+            .column(5)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("Failed to downcast city array to StringArray");
+        assert_eq!(city_array.value(0), "New York");
+        assert_eq!(city_array.value(1), "Los Angeles");
+    }
+
+    #[test]
+    fn test_parse_messages_with_kafka_meta_empty() {
+        let descriptor_data = create_test_descriptor();
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("serialized_kafka_records_partition", DataType::Int32, 
false),
+            Field::new("serialized_kafka_records_offset", DataType::Int64, 
false),
+            Field::new("serialized_kafka_records_timestamp", DataType::Int64, 
false),
+            Field::new("id", DataType::Int32, true),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        let mut deserializer = PbDeserializer::new(
+            descriptor_data,
+            "TestMessage",
+            schema.clone(),
+            &HashMap::new(),
+            &[],
+        )
+        .expect("Failed to create deserializer");
+
+        let messages = create_binary_array(vec![]);
+        let partitions = create_partition_array(vec![]);
+        let offsets = create_offset_array(vec![]);
+        let timestamps = create_timestamp_array(vec![]);
+
+        let batch = deserializer
+            .parse_messages_with_kafka_meta(&messages, &partitions, &offsets, 
&timestamps)
+            .expect("Failed to deserialize");
+
+        assert_eq!(batch.num_rows(), 0);
+        assert_eq!(batch.num_columns(), 5);
+    }
+
+    #[test]
+    fn test_parse_messages_with_kafka_meta_different_partitions() {
+        let descriptor_data = create_test_descriptor();
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("serialized_kafka_records_partition", DataType::Int32, 
false),
+            Field::new("serialized_kafka_records_offset", DataType::Int64, 
false),
+            Field::new("serialized_kafka_records_timestamp", DataType::Int64, 
false),
+            Field::new("id", DataType::Int32, true),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+
+        let mut deserializer = PbDeserializer::new(
+            descriptor_data,
+            "TestMessage",
+            schema.clone(),
+            &HashMap::new(),
+            &[],
+        )
+        .expect("Failed to create deserializer");
+
+        let messages = create_binary_array(vec![
+            create_test_message(1, "Alice", 95.5, true),
+            create_test_message(2, "Bob", 87.3, false),
+            create_test_message(3, "Charlie", 92.1, true),
+            create_test_message(4, "David", 88.0, false),
+        ]);
+
+        let partitions = create_partition_array(vec![0, 1, 0, 2]);
+        let offsets = create_offset_array(vec![100, 50, 200, 75]);
+        let timestamps = create_timestamp_array(vec![1000, 2000, 3000, 4000]);
+
+        let batch = deserializer
+            .parse_messages_with_kafka_meta(&messages, &partitions, &offsets, 
&timestamps)
+            .expect("Failed to deserialize");
+
+        assert_eq!(batch.num_rows(), 4);
+        assert_eq!(batch.num_columns(), 5);
+
+        // check partition
+        let partition_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast partition array to Int32Array");
+        assert_eq!(partition_array.value(0), 0);
+        assert_eq!(partition_array.value(1), 1);
+        assert_eq!(partition_array.value(2), 0);
+        assert_eq!(partition_array.value(3), 2);
+
+        // check offset
+        let offset_array = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast offset array to Int64Array");
+        assert_eq!(offset_array.value(0), 100);
+        assert_eq!(offset_array.value(1), 50);
+        assert_eq!(offset_array.value(2), 200);
+        assert_eq!(offset_array.value(3), 75);
+
+        // check timestamp
+        let timestamp_array = batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("Failed to downcast timestamp array to Int64Array");
+        assert_eq!(timestamp_array.value(0), 1000);
+        assert_eq!(timestamp_array.value(1), 2000);
+        assert_eq!(timestamp_array.value(2), 3000);
+        assert_eq!(timestamp_array.value(3), 4000);
+
+        // check id
+        let id_array = batch
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("Failed to downcast id array to Int32Array");
+        assert_eq!(id_array.value(0), 1);
+        assert_eq!(id_array.value(1), 2);
+        assert_eq!(id_array.value(2), 3);
+        assert_eq!(id_array.value(3), 4);
+    }
+}
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/shared_array_builder.rs 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_array_builder.rs
new file mode 100644
index 00000000..ff5076d3
--- /dev/null
+++ b/native-engine/datafusion-ext-plans/src/flink/serde/shared_array_builder.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{cell::UnsafeCell, sync::Arc};
+
+use arrow::array::ArrayBuilder;
+use datafusion::common::DataFusionError;
+
+#[derive(Clone, Debug)]
+pub(crate) struct SharedArrayBuilder {
+    inner: Arc<UnsafeCell<dyn ArrayBuilder>>,
+}
+
+unsafe impl Send for SharedArrayBuilder {}
+unsafe impl Sync for SharedArrayBuilder {}
+
+impl SharedArrayBuilder {
+    pub fn new(builder: impl ArrayBuilder) -> Self {
+        Self {
+            inner: Arc::new(UnsafeCell::new(builder)),
+        }
+    }
+
+    #[allow(clippy::mut_from_ref)]
+    pub(crate) fn get_dyn_mut(&self) -> &mut dyn ArrayBuilder {
+        // safety: get value from UnsafeCell
+        unsafe { &mut *self.inner.get() }
+    }
+
+    pub(crate) fn get_mut<T: ArrayBuilder>(
+        &self,
+    ) -> datafusion::common::Result<SharedArrayBuilderHolder<T>> {
+        SharedArrayBuilderHolder::try_new(self.clone())
+    }
+
+    pub(crate) fn len(&self) -> usize {
+        self.get_dyn_mut().len()
+    }
+}
+pub struct SharedArrayBuilderHolder<T: ArrayBuilder> {
+    _shared: SharedArrayBuilder,
+    holder: UnsafeCell<&'static mut T>, // bypass lifetime checking
+}
+
+unsafe impl<T: ArrayBuilder> Send for SharedArrayBuilderHolder<T> {}
+unsafe impl<T: ArrayBuilder> Sync for SharedArrayBuilderHolder<T> {}
+
+impl<T: ArrayBuilder> SharedArrayBuilderHolder<T> {
+    fn try_new(shared: SharedArrayBuilder) -> datafusion::common::Result<Self> 
{
+        let shared_cloned = shared.clone();
+        let borrowed = shared.get_dyn_mut();
+        // println!("The generic type parameter is: {}", type_name::<T>());
+        let holder = borrowed.as_any_mut().downcast_mut::<T>().ok_or_else(|| {
+            DataFusionError::Execution(format!(
+                "failed to downcast array builder {shared_cloned:?}",
+            ))
+        })?;
+        Ok(Self {
+            _shared: shared_cloned,
+            holder: unsafe {
+                // safety: holder has the same lifetime as shared, this 
bypasses lifetime
+                // checking
+                UnsafeCell::new(std::mem::transmute::<_, &'static mut 
T>(holder))
+            },
+        })
+    }
+
+    #[allow(clippy::mut_from_ref)]
+    pub(crate) fn get_mut(&self) -> &mut T {
+        // safety: bypass mutability checking
+        unsafe { *self.holder.get() }
+    }
+}
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/shared_list_array_builder.rs
 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_list_array_builder.rs
new file mode 100644
index 00000000..ba54c114
--- /dev/null
+++ 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_list_array_builder.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{ArrayBuilder, ArrayRef, BufferBuilder, GenericListArray, 
NullBufferBuilder},
+    buffer::{Buffer, OffsetBuffer},
+};
+use arrow_schema::{Field, FieldRef};
+
+use crate::flink::serde::{
+    pb_deserializer::adaptive_append_children, 
shared_array_builder::SharedArrayBuilder,
+};
+
+pub struct SharedListArrayBuilder {
+    offsets_builder: BufferBuilder<i32>,
+    null_buffer_builder: NullBufferBuilder,
+    values_builder: SharedArrayBuilder,
+    field: Option<FieldRef>,
+    current_offset: i32,
+    adaptive_append_children: Option<Box<dyn FnMut(usize) + Send + Sync>>,
+}
+
+impl ArrayBuilder for SharedListArrayBuilder {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+
+    fn len(&self) -> usize {
+        self.null_buffer_builder.len()
+    }
+
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+}
+
+impl SharedListArrayBuilder {
+    /// Creates a new [`SharedArrayListBuilder`] from a given values array
+    /// builder
+    pub(crate) fn new(values_builder: SharedArrayBuilder) -> Self {
+        let capacity = values_builder.len();
+        let appender = adaptive_append_children(&values_builder);
+        Self::with_capacity(values_builder, capacity, appender)
+    }
+
+    /// Creates a new [`SharedArrayListBuilder`] with specified capacity
+    pub(crate) fn with_capacity(
+        values_builder: SharedArrayBuilder,
+        capacity: usize,
+        adaptive_append_children: Option<Box<dyn FnMut(usize) + Send + Sync>>,
+    ) -> Self {
+        let mut offsets_builder = BufferBuilder::<i32>::new(capacity + 1);
+        offsets_builder.append(0);
+        Self {
+            offsets_builder,
+            null_buffer_builder: NullBufferBuilder::new(capacity),
+            values_builder,
+            field: None,
+            current_offset: 0,
+            adaptive_append_children,
+        }
+    }
+
+    /// Returns the child array builder as a reference
+    pub(crate) fn values(&mut self) -> &SharedArrayBuilder {
+        &self.values_builder
+    }
+
+    /// Finish the current variable-length list array slot
+    #[inline]
+    pub fn append(&mut self, is_valid: bool) {
+        if let Some(adaptive_append_children) = 
self.adaptive_append_children.as_mut() {
+            adaptive_append_children(self.values_builder.len());
+        }
+        self.offsets_builder
+            .append(self.values_builder.len() as i32);
+        self.null_buffer_builder.append(is_valid);
+    }
+
+    pub fn adaptive_append(&mut self) {
+        let next_offset = self.values_builder.len() as i32;
+        if next_offset > self.current_offset {
+            self.append(true);
+            self.current_offset = next_offset;
+        }
+    }
+
+    /// Builds the [`GenericListArray`] and reset this builder
+    pub fn finish(&mut self) -> GenericListArray<i32> {
+        let values = self.values_builder.get_dyn_mut().finish();
+        let nulls = self.null_buffer_builder.finish();
+
+        let offsets = self.offsets_builder.finish();
+        let offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) };
+        self.offsets_builder.append(0);
+        self.current_offset = 0;
+
+        let field = match &self.field {
+            Some(f) => f.clone(),
+            None => Arc::new(Field::new_list_field(values.data_type().clone(), 
true)),
+        };
+
+        GenericListArray::new(field, offsets, values, nulls)
+    }
+
+    /// Builds the [`GenericListArray`] without resetting the builder
+    pub fn finish_cloned(&self) -> GenericListArray<i32> {
+        let values = self.values_builder.get_dyn_mut().finish_cloned();
+        let nulls = self.null_buffer_builder.finish_cloned();
+
+        let offsets = Buffer::from_slice_ref(self.offsets_builder.as_slice());
+        let offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) };
+
+        let field = match &self.field {
+            Some(f) => f.clone(),
+            None => Arc::new(Field::new_list_field(values.data_type().clone(), 
true)),
+        };
+
+        GenericListArray::new(field, offsets, values, nulls)
+    }
+}
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs
 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs
new file mode 100644
index 00000000..b5545e48
--- /dev/null
+++ 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::{
+    array::{
+        Array, ArrayBuilder, ArrayData, ArrayRef, BufferBuilder, MapArray, 
NullBufferBuilder,
+        StructArray,
+    },
+    buffer::{Buffer, NullBuffer},
+};
+use arrow_schema::{DataType, Field, FieldRef};
+
+use crate::flink::serde::{
+    pb_deserializer::adaptive_append_children, 
shared_array_builder::SharedArrayBuilder,
+};
+
+pub struct SharedMapArrayBuilder {
+    offsets_builder: BufferBuilder<i32>,
+    null_buffer_builder: NullBufferBuilder,
+    field_names: MapFieldNames,
+    key_builder: SharedArrayBuilder,
+    value_builder: SharedArrayBuilder,
+    value_field: Option<FieldRef>,
+    current_offset: i32,
+    adaptive_append_children: Option<Box<dyn FnMut(usize) + Send + Sync>>,
+}
+
+/// The [`Field`] names for a [`MapArray`]
+#[derive(Debug, Clone)]
+pub struct MapFieldNames {
+    /// [`Field`] name for map entries
+    pub entry: String,
+    /// [`Field`] name for map key
+    pub key: String,
+    /// [`Field`] name for map value
+    pub value: String,
+}
+
+impl Default for MapFieldNames {
+    fn default() -> Self {
+        Self {
+            entry: "entries".to_string(),
+            key: "key".to_string(),
+            value: "value".to_string(),
+        }
+    }
+}
+
+impl SharedMapArrayBuilder {
+    /// Creates a new `MapBuilder`
+    pub(crate) fn new(
+        field_names: Option<MapFieldNames>,
+        key_builder: SharedArrayBuilder,
+        value_builder: SharedArrayBuilder,
+    ) -> Self {
+        let capacity = key_builder.len();
+
+        let mut appenders = vec![];
+        appenders.extend(adaptive_append_children(&key_builder));
+        appenders.extend(adaptive_append_children(&value_builder));
+        let appender = match appenders.len() {
+            1 => {
+                let mut appender = appenders
+                    .pop()
+                    .expect("appenders length is 1, pop should succeed");
+                Some(Box::new(move |size| appender(size)) as Box<dyn 
FnMut(usize) + Send + Sync>)
+            }
+            2 => {
+                let mut appender2 = appenders
+                    .pop()
+                    .expect("appenders length is 2, first pop should succeed");
+                let mut appender1 = appenders
+                    .pop()
+                    .expect("appenders length is 2, second pop should 
succeed");
+                Some(Box::new(move |size| {
+                    appender1(size);
+                    appender2(size);
+                }) as Box<dyn FnMut(usize) + Send + Sync>)
+            }
+            _ => None,
+        };
+        Self::with_capacity(field_names, key_builder, value_builder, capacity, 
appender)
+    }
+
+    /// Creates a new `MapBuilder` with specified capacity
+    pub(crate) fn with_capacity(
+        field_names: Option<MapFieldNames>,
+        key_builder: SharedArrayBuilder,
+        value_builder: SharedArrayBuilder,
+        capacity: usize,
+        adaptive_append_children: Option<Box<dyn FnMut(usize) + Send + Sync>>,
+    ) -> Self {
+        let mut offsets_builder = BufferBuilder::<i32>::new(capacity + 1);
+        offsets_builder.append(0);
+        Self {
+            offsets_builder,
+            null_buffer_builder: NullBufferBuilder::new(capacity),
+            field_names: field_names.unwrap_or_default(),
+            key_builder,
+            value_builder,
+            value_field: None,
+            current_offset: 0,
+            adaptive_append_children,
+        }
+    }
+
+    /// Returns the key array builder of the map
+    pub(crate) fn keys(&mut self) -> &mut SharedArrayBuilder {
+        &mut self.key_builder
+    }
+
+    /// Returns the value array builder of the map
+    pub(crate) fn values(&mut self) -> &mut SharedArrayBuilder {
+        &mut self.value_builder
+    }
+
+    /// Returns both the key and value array builders of the map
+    pub(crate) fn entries(&mut self) -> (&mut SharedArrayBuilder, &mut 
SharedArrayBuilder) {
+        (&mut self.key_builder, &mut self.value_builder)
+    }
+
+    /// Finish the current map array slot
+    #[inline]
+    pub fn append(&mut self, is_valid: bool) {
+        if let Some(adaptive_append_children) = 
self.adaptive_append_children.as_mut() {
+            adaptive_append_children(self.key_builder.len());
+        }
+        self.offsets_builder.append(self.key_builder.len() as i32);
+        self.null_buffer_builder.append(is_valid);
+    }
+
+    pub fn adaptive_append(&mut self) {
+        let next_offset = self.key_builder.len() as i32;
+        if next_offset > self.current_offset {
+            self.append(true);
+            self.current_offset = next_offset;
+        }
+    }
+
+    /// Builds the [`MapArray`]
+    pub fn finish(&mut self) -> MapArray {
+        let len = self.len();
+        let keys_arr = self.key_builder.get_dyn_mut().finish();
+        let values_arr = self.value_builder.get_dyn_mut().finish();
+        let offset_buffer = self.offsets_builder.finish();
+        self.offsets_builder.append(0);
+        self.current_offset = 0;
+        let null_bit_buffer = self.null_buffer_builder.finish();
+
+        self.finish_helper(keys_arr, values_arr, offset_buffer, 
null_bit_buffer, len)
+    }
+
+    /// Builds the [`MapArray`] without resetting the builder
+    pub fn finish_cloned(&self) -> MapArray {
+        let len = self.len();
+        let keys_arr = self.key_builder.get_dyn_mut().finish_cloned();
+        let values_arr = self.value_builder.get_dyn_mut().finish_cloned();
+        let offset_buffer = 
Buffer::from_slice_ref(self.offsets_builder.as_slice());
+        let nulls = self.null_buffer_builder.finish_cloned();
+        self.finish_helper(keys_arr, values_arr, offset_buffer, nulls, len)
+    }
+
+    fn finish_helper(
+        &self,
+        keys_arr: Arc<dyn Array>,
+        values_arr: Arc<dyn Array>,
+        offset_buffer: Buffer,
+        nulls: Option<NullBuffer>,
+        len: usize,
+    ) -> MapArray {
+        let keys_field = Arc::new(Field::new(
+            self.field_names.key.as_str(),
+            keys_arr.data_type().clone(),
+            true,
+        ));
+        let values_field = match &self.value_field {
+            Some(f) => f.clone(),
+            None => Arc::new(Field::new(
+                self.field_names.value.as_str(),
+                values_arr.data_type().clone(),
+                true,
+            )),
+        };
+
+        let struct_array =
+            StructArray::from(vec![(keys_field, keys_arr), (values_field, 
values_arr)]);
+
+        let map_field = Arc::new(Field::new(
+            self.field_names.entry.as_str(),
+            struct_array.data_type().clone(),
+            false,
+        ));
+        let array_data = ArrayData::builder(DataType::Map(map_field, false))
+            .len(len)
+            .add_buffer(offset_buffer)
+            .add_child_data(struct_array.into_data())
+            .nulls(nulls);
+
+        let array_data = unsafe { array_data.build_unchecked() };
+
+        MapArray::from(array_data)
+    }
+}
+
+impl ArrayBuilder for SharedMapArrayBuilder {
+    fn len(&self) -> usize {
+        self.null_buffer_builder.len()
+    }
+
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+}
diff --git 
a/native-engine/datafusion-ext-plans/src/flink/serde/shared_struct_array_builder.rs
 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_struct_array_builder.rs
new file mode 100644
index 00000000..57265106
--- /dev/null
+++ 
b/native-engine/datafusion-ext-plans/src/flink/serde/shared_struct_array_builder.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::array::{ArrayBuilder, ArrayRef, NullBufferBuilder, StructArray};
+use arrow_schema::Fields;
+
+use crate::flink::serde::shared_array_builder::SharedArrayBuilder;
+
+pub struct SharedStructArrayBuilder {
+    fields: Fields,
+    field_builders: Vec<SharedArrayBuilder>,
+    null_buffer_builder: NullBufferBuilder,
+}
+
+impl std::fmt::Debug for SharedStructArrayBuilder {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SharedStructArrayBuilder")
+            .field("fields", &self.fields)
+            .field("bitmap_builder", &self.null_buffer_builder)
+            .field("len", &self.len())
+            .finish()
+    }
+}
+
+impl ArrayBuilder for SharedStructArrayBuilder {
+    /// Returns the number of array slots in the builder.
+    ///
+    /// Note that this always return the first child field builder's length, 
and
+    /// it is the caller's responsibility to maintain the consistency that
+    /// all the child field builder should have the equal number of
+    /// elements.
+    fn len(&self) -> usize {
+        self.null_buffer_builder.len()
+    }
+
+    /// Builds the array.
+    fn finish(&mut self) -> ArrayRef {
+        Arc::new(self.finish())
+    }
+
+    /// Builds the array without resetting the builder.
+    fn finish_cloned(&self) -> ArrayRef {
+        Arc::new(self.finish_cloned())
+    }
+
+    /// Returns the builder as a non-mutable `Any` reference.
+    ///
+    /// This is most useful when one wants to call non-mutable APIs on a
+    /// specific builder type. In this case, one can first cast this into a
+    /// `Any`, and then use `downcast_ref` to get a reference on the
+    /// specific builder.
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns the builder as a mutable `Any` reference.
+    ///
+    /// This is most useful when one wants to call mutable APIs on a specific
+    /// builder type. In this case, one can first cast this into a `Any`,
+    /// and then use `downcast_mut` to get a reference on the specific
+    /// builder.
+    fn as_any_mut(&mut self) -> &mut dyn Any {
+        self
+    }
+
+    /// Returns the boxed builder as a box of `Any`.
+    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+        self
+    }
+}
+
+impl SharedStructArrayBuilder {
+    /// Creates a new `SharedStructArrayBuilder`
+    pub(crate) fn new(fields: impl Into<Fields>, field_builders: 
Vec<SharedArrayBuilder>) -> Self {
+        let fields: Fields = fields.into();
+        Self {
+            field_builders,
+            fields,
+            null_buffer_builder: NullBufferBuilder::new(0),
+        }
+    }
+
+    pub(crate) fn get_field_builders(&self) -> Vec<SharedArrayBuilder> {
+        self.field_builders.clone()
+    }
+
+    /// Appends an element (either null or non-null) to the struct. The actual
+    /// elements should be appended for each child sub-array in a consistent
+    /// way.
+    #[inline]
+    pub fn append(&mut self, is_valid: bool) {
+        self.null_buffer_builder.append(is_valid);
+    }
+
+    /// Builds the `StructArray` and reset this builder.
+    pub fn finish(&mut self) -> StructArray {
+        if self.fields.is_empty() {
+            return StructArray::new_empty_fields(self.len(), 
self.null_buffer_builder.finish());
+        }
+
+        let arrays = self
+            .field_builders
+            .iter_mut()
+            .map(|f| f.get_dyn_mut().finish())
+            .collect();
+        let nulls = self.null_buffer_builder.finish();
+        StructArray::new(self.fields.clone(), arrays, nulls)
+    }
+
+    /// Builds the `StructArray` without resetting the builder.
+    pub fn finish_cloned(&self) -> StructArray {
+        if self.fields.is_empty() {
+            return StructArray::new_empty_fields(
+                self.len(),
+                self.null_buffer_builder.finish_cloned(),
+            );
+        }
+
+        let arrays = self
+            .field_builders
+            .iter()
+            .map(|f| f.get_dyn_mut().finish_cloned())
+            .collect();
+
+        let nulls = self.null_buffer_builder.finish_cloned();
+
+        StructArray::new(self.fields.clone(), arrays, nulls)
+    }
+}
diff --git a/native-engine/datafusion-ext-plans/src/lib.rs 
b/native-engine/datafusion-ext-plans/src/lib.rs
index f85f3b26..d114c27a 100644
--- a/native-engine/datafusion-ext-plans/src/lib.rs
+++ b/native-engine/datafusion-ext-plans/src/lib.rs
@@ -21,6 +21,14 @@
 #![feature(portable_simd)]
 #![feature(ptr_as_ref_unchecked)]
 
+extern crate arrow;
+extern crate arrow_schema;
+extern crate bytes;
+extern crate datafusion;
+extern crate datafusion_ext_commons;
+extern crate prost;
+extern crate prost_reflect;
+
 // execution plan implementations
 pub mod agg;
 pub mod agg_exec;
@@ -50,6 +58,7 @@ pub mod window_exec;
 
 // helper modules
 pub mod common;
+pub mod flink;
 pub mod generate;
 mod scan;
 pub mod shuffle;

Reply via email to