Copilot commented on code in PR #2106:
URL: https://github.com/apache/auron/pull/2106#discussion_r2969169853


##########
native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs:
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+    ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, 
UInt16Builder,
+    UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+    schema: SchemaRef,
+    auron_operator_id: String,
+    mock_data_json_array: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+    pub fn new(schema: SchemaRef, auron_operator_id: String, 
mock_data_json_array: String) -> Self {
+        Self {
+            schema,
+            auron_operator_id,
+            mock_data_json_array,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let deserialized_pb_stream = mock_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            self.mock_data_json_array.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaMockScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+    fn name(&self) -> &str {
+        "KafkaMockScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
+        })
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            self.schema.clone(),
+            self.auron_operator_id.clone(),
+            self.mock_data_json_array.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        todo!()
+    }
+}
+
+fn mock_records(
+    schema: SchemaRef,
+    exec_ctx: Arc<ExecutionContext>,
+    mock_data_json_array: String,
+) -> Result<SendableRecordBatchStream> {
+    let json_value: sonic_rs::Value = 
sonic_rs::from_str(&mock_data_json_array).map_err(|e| {
+        DataFusionError::Execution(format!("mock_data_json_array is not valid 
JSON: {e}"))
+    })?;
+    let rows = json_value.as_array().ok_or_else(|| {
+        DataFusionError::Execution("mock_data_json_array must be a JSON 
array".to_string())
+    })?;
+
+    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        let column = build_array_from_json(field, rows)?;
+        columns.push(column);
+    }
+
+    let batch = RecordBatch::try_new(schema.clone(), columns)?;
+
+    Ok(
+        exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move 
|sender| async move {
+            sender.send(batch).await;
+            Ok(())
+        }),
+    )
+}
+
+fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) -> 
Result<ArrayRef> {
+    let field_name = field.name();
+    let nullable = field.is_nullable();
+
+    macro_rules! build_typed_array {
+        ($builder_ty:ident, $extract:expr) => {{
+            let mut builder = $builder_ty::new();
+            for row in rows.iter() {
+                let val = row.get(field_name);
+                match val {
+                    Some(v) if !v.is_null() => {
+                        let extracted = ($extract)(v).ok_or_else(|| {
+                            DataFusionError::Execution(format!(
+                                "Field '{}' type mismatch, expected {}",
+                                field_name,
+                                field.data_type()
+                            ))
+                        })?;
+                        builder.append_value(extracted);
+                    }
+                    _ => {
+                        if nullable {
+                            builder.append_null();
+                        } else {
+                            return Err(DataFusionError::Execution(format!(
+                                "Field '{}' is non-nullable but got 
null/missing value",
+                                field_name
+                            )));
+                        }
+                    }
+                }
+            }
+            Ok(Arc::new(builder.finish()) as ArrayRef)
+        }};
+    }
+
+    match field.data_type() {
+        DataType::Boolean => {
+            build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value| 
v.as_bool())
+        }
+        DataType::Int8 => {
+            build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i8))
+        }

Review Comment:
   The integer conversions use `as` casts (e.g., `n as i8`). For out-of-range 
JSON values this silently truncates/wraps, producing incorrect mock data 
without an error. Use checked conversions (e.g., `i8::try_from(n).ok()`, 
`u8::try_from(n).ok()`) and return a type-mismatch/range error when conversion 
fails.



##########
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java:
##########
@@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception {
                 this.auronOperatorId + "-" + 
getRuntimeContext().getIndexOfThisSubtask();
         scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
         scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
-        sourcePlan.setKafkaScan(scanExecNode.build());
-        this.physicalPlanNode = sourcePlan.build();
-
-        // 1. Initialize Kafka Consumer for partition metadata discovery only 
(not for data consumption)
-        Properties kafkaProps = new Properties();
-        kafkaProps.putAll(kafkaProperties);
-        // Override to ensure this consumer does not interfere with actual 
data consumption
-        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
-        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        kafkaProps.put(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        kafkaProps.put(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
-
         StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
getRuntimeContext();
-        // 2. Discover and assign partitions for this subtask
-        List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(topic);
-        int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
-        int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
         this.assignedPartitions = new ArrayList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            int partitionId = partitionInfo.partition();
-            if (KafkaTopicPartitionAssigner.assign(topic, partitionId, 
numSubtasks) == subtaskIndex) {
-                assignedPartitions.add(partitionId);
-            }
-        }
-        boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
-        Map<String, Object> auronRuntimeInfo = new HashMap<>();
-        auronRuntimeInfo.put("subtask_index", subtaskIndex);
-        auronRuntimeInfo.put("num_readers", numSubtasks);
-        auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
-        auronRuntimeInfo.put("restored_offsets", restoredOffsets);
-        auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
-        JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
         currentOffsets = new HashMap<>();
         pendingOffsetsToCommit = new LinkedMap();
-        LOG.info(
-                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}, "
-                        + "subtask {} assigned partitions: {}",
-                auronOperatorIdWithSubtaskIndex,
-                enableCheckpoint,
-                subtaskIndex,
-                assignedPartitions);
+        if (mockData != null) {
+            scanExecNode.setMockDataJsonArray(mockData);
+            JsonNode mockDataJson = mapper.readTree(mockData);
+            for (JsonNode data : mockDataJson) {
+                int partition = 
data.get("serialized_kafka_records_partition").asInt();

Review Comment:
   In mock-data mode, `mapper.readTree(mockData)` is assumed to be a JSON array 
of objects each containing `serialized_kafka_records_partition`. If `mockData` 
is not an array, or if an element lacks that field, `data.get(...).asInt()` can 
throw/produce incorrect defaults. Please validate `mockDataJson.isArray()` and 
that each element has the expected fields, and fail fast with a clear error 
message.
   ```suggestion
               if (!mockDataJson.isArray()) {
                   throw new IllegalArgumentException(
                           "Mock data for auron kafka source must be a JSON 
array of objects, but was: "
                                   + mockDataJson.getNodeType());
               }
               for (int i = 0; i < mockDataJson.size(); i++) {
                   JsonNode data = mockDataJson.get(i);
                   if (data == null || !data.isObject()) {
                       throw new IllegalArgumentException(
                               "Each element in mock data array must be a JSON 
object; invalid element at index "
                                       + i
                                       + " with type: "
                                       + (data == null ? "null" : 
data.getNodeType()));
                   }
                   JsonNode partitionNode = 
data.get("serialized_kafka_records_partition");
                   if (partitionNode == null || !partitionNode.isInt()) {
                       throw new IllegalArgumentException(
                               "Mock data element at index "
                                       + i
                                       + " must contain integer field 
'serialized_kafka_records_partition'");
                   }
                   int partition = partitionNode.intValue();
   ```



##########
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java:
##########
@@ -174,53 +177,63 @@ public void open(Configuration config) throws Exception {
                 this.auronOperatorId + "-" + 
getRuntimeContext().getIndexOfThisSubtask();
         scanExecNode.setAuronOperatorId(auronOperatorIdWithSubtaskIndex);
         scanExecNode.setStartupMode(KafkaStartupMode.valueOf(startupMode));
-        sourcePlan.setKafkaScan(scanExecNode.build());
-        this.physicalPlanNode = sourcePlan.build();
-
-        // 1. Initialize Kafka Consumer for partition metadata discovery only 
(not for data consumption)
-        Properties kafkaProps = new Properties();
-        kafkaProps.putAll(kafkaProperties);
-        // Override to ensure this consumer does not interfere with actual 
data consumption
-        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-auron-fetch-meta");
-        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        kafkaProps.put(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        kafkaProps.put(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        this.kafkaConsumer = new KafkaConsumer<>(kafkaProps);
-
         StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
getRuntimeContext();
-        // 2. Discover and assign partitions for this subtask
-        List<PartitionInfo> partitionInfos = 
kafkaConsumer.partitionsFor(topic);
-        int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
-        int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
         this.assignedPartitions = new ArrayList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            int partitionId = partitionInfo.partition();
-            if (KafkaTopicPartitionAssigner.assign(topic, partitionId, 
numSubtasks) == subtaskIndex) {
-                assignedPartitions.add(partitionId);
-            }
-        }
-        boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled();
-        Map<String, Object> auronRuntimeInfo = new HashMap<>();
-        auronRuntimeInfo.put("subtask_index", subtaskIndex);
-        auronRuntimeInfo.put("num_readers", numSubtasks);
-        auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint);
-        auronRuntimeInfo.put("restored_offsets", restoredOffsets);
-        auronRuntimeInfo.put("assigned_partitions", assignedPartitions);
-        JniBridge.putResource(auronOperatorIdWithSubtaskIndex, 
mapper.writeValueAsString(auronRuntimeInfo));
         currentOffsets = new HashMap<>();
         pendingOffsetsToCommit = new LinkedMap();
-        LOG.info(
-                "Auron kafka source init successful, Auron operator id: {}, 
enableCheckpoint is {}, "
-                        + "subtask {} assigned partitions: {}",
-                auronOperatorIdWithSubtaskIndex,
-                enableCheckpoint,
-                subtaskIndex,
-                assignedPartitions);
+        if (mockData != null) {
+            scanExecNode.setMockDataJsonArray(mockData);
+            JsonNode mockDataJson = mapper.readTree(mockData);
+            for (JsonNode data : mockDataJson) {
+                int partition = 
data.get("serialized_kafka_records_partition").asInt();
+                if (!assignedPartitions.contains(partition)) {
+                    assignedPartitions.add(partition);
+                }
+            }
+            LOG.info("Use mock data for auron kafka source, partition size = 
{}", assignedPartitions);

Review Comment:
   The log message says `partition size = {}` but logs the full 
`assignedPartitions` list. Either log `assignedPartitions.size()` or adjust the 
message to reflect that it prints the partition list.
   ```suggestion
               LOG.info("Use mock data for auron kafka source, partition size = 
{}", assignedPartitions.size());
   ```



##########
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java:
##########
@@ -458,4 +471,9 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
     public void setWatermarkStrategy(WatermarkStrategy<RowData> 
watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
+
+    public void setMockData(String mockData) {
+        Preconditions.checkArgument(mockData != null, "Auron kafka source mock 
data must not null");

Review Comment:
   `setMockData` precondition message is grammatically incorrect: "must not 
null". Please change it to something like "must not be null" (or include the 
option key name to make failures easier to diagnose).
   ```suggestion
           Preconditions.checkArgument(mockData != null, "Auron kafka source 
mock data must not be null");
   ```



##########
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java:
##########
@@ -107,7 +113,8 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
                     format,
                     formatConfig,
                     tableOptions.get(BUFFER_SIZE),
-                    tableOptions.get(START_UP_MODE));
+                    tableOptions.get(START_UP_MODE),
+                    tableOptions.get(KAFKA_MOCK_DATA));

Review Comment:
   `KAFKA_MOCK_DATA` is read via `tableOptions.get(KAFKA_MOCK_DATA)`. Because 
the option has `noDefaultValue()`, this makes the new option effectively 
required and will break existing tables that don't set it. Please use 
`getOptional(KAFKA_MOCK_DATA).orElse(null)` (or similar) and keep the 
downstream code path null/empty-safe.
   ```suggestion
                       tableOptions.getOptional(KAFKA_MOCK_DATA).orElse(null));
   ```



##########
native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs:
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+    ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, 
UInt16Builder,
+    UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+    schema: SchemaRef,
+    auron_operator_id: String,
+    mock_data_json_array: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+    pub fn new(schema: SchemaRef, auron_operator_id: String, 
mock_data_json_array: String) -> Self {
+        Self {
+            schema,
+            auron_operator_id,
+            mock_data_json_array,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let deserialized_pb_stream = mock_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            self.mock_data_json_array.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaMockScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+    fn name(&self) -> &str {
+        "KafkaMockScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
+        })
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            self.schema.clone(),
+            self.auron_operator_id.clone(),
+            self.mock_data_json_array.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        todo!()

Review Comment:
   `statistics()` is left as `todo!()`, which will panic if DataFusion requests 
plan statistics (e.g., for EXPLAIN, optimizations, or some execution paths). 
Please return an appropriate `Statistics` value (often “unknown”/default) 
instead of panicking.
   ```suggestion
           Ok(Statistics::default())
   ```



##########
native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs:
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+    ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, 
UInt16Builder,
+    UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+    schema: SchemaRef,
+    auron_operator_id: String,
+    mock_data_json_array: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+    pub fn new(schema: SchemaRef, auron_operator_id: String, 
mock_data_json_array: String) -> Self {
+        Self {
+            schema,
+            auron_operator_id,
+            mock_data_json_array,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let deserialized_pb_stream = mock_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            self.mock_data_json_array.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaMockScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+    fn name(&self) -> &str {
+        "KafkaMockScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },

Review Comment:
   `properties()` marks this exec as `Boundedness::Unbounded`, but 
`mock_records()` emits a finite set of batches and then completes. This 
mismatch can affect planning and streaming semantics. Consider using 
`Boundedness::Bounded` (or otherwise matching the actual runtime behavior).
   ```suggestion
                   Boundedness::Bounded,
   ```



##########
native-engine/datafusion-ext-plans/src/flink/kafka_mock_scan_exec.rs:
##########
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{any::Any, fmt::Formatter, sync::Arc};
+
+use arrow::array::{
+    ArrayRef, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, LargeStringBuilder, RecordBatch, StringBuilder,
+    TimestampMicrosecondBuilder, TimestampMillisecondBuilder, UInt8Builder, 
UInt16Builder,
+    UInt32Builder, UInt64Builder,
+};
+use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
+use datafusion::{
+    common::{DataFusionError, Statistics},
+    error::Result,
+    execution::TaskContext,
+    physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
+    physical_plan::{
+        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SendableRecordBatchStream,
+        execution_plan::{Boundedness, EmissionType},
+        metrics::{ExecutionPlanMetricsSet, MetricsSet},
+    },
+};
+use once_cell::sync::OnceCell;
+use sonic_rs::{JsonContainerTrait, JsonValueTrait};
+
+use crate::common::execution_context::ExecutionContext;
+
+#[derive(Debug, Clone)]
+pub struct KafkaMockScanExec {
+    schema: SchemaRef,
+    auron_operator_id: String,
+    mock_data_json_array: String,
+    metrics: ExecutionPlanMetricsSet,
+    props: OnceCell<PlanProperties>,
+}
+
+impl KafkaMockScanExec {
+    pub fn new(schema: SchemaRef, auron_operator_id: String, 
mock_data_json_array: String) -> Self {
+        Self {
+            schema,
+            auron_operator_id,
+            mock_data_json_array,
+            metrics: ExecutionPlanMetricsSet::new(),
+            props: OnceCell::new(),
+        }
+    }
+
+    fn execute_with_ctx(
+        &self,
+        exec_ctx: Arc<ExecutionContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let deserialized_pb_stream = mock_records(
+            exec_ctx.output_schema(),
+            exec_ctx.clone(),
+            self.mock_data_json_array.clone(),
+        )?;
+        Ok(deserialized_pb_stream)
+    }
+}
+
+impl DisplayAs for KafkaMockScanExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        write!(f, "KafkaMockScanExec")
+    }
+}
+
+impl ExecutionPlan for KafkaMockScanExec {
+    fn name(&self) -> &str {
+        "KafkaMockScanExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.props.get_or_init(|| {
+            PlanProperties::new(
+                EquivalenceProperties::new(self.schema()),
+                UnknownPartitioning(1),
+                EmissionType::Both,
+                Boundedness::Unbounded {
+                    requires_infinite_memory: false,
+                },
+            )
+        })
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self::new(
+            self.schema.clone(),
+            self.auron_operator_id.clone(),
+            self.mock_data_json_array.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
+        self.execute_with_ctx(exec_ctx)
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        todo!()
+    }
+}
+
+fn mock_records(
+    schema: SchemaRef,
+    exec_ctx: Arc<ExecutionContext>,
+    mock_data_json_array: String,
+) -> Result<SendableRecordBatchStream> {
+    let json_value: sonic_rs::Value = 
sonic_rs::from_str(&mock_data_json_array).map_err(|e| {
+        DataFusionError::Execution(format!("mock_data_json_array is not valid 
JSON: {e}"))
+    })?;
+    let rows = json_value.as_array().ok_or_else(|| {
+        DataFusionError::Execution("mock_data_json_array must be a JSON 
array".to_string())
+    })?;
+
+    let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        let column = build_array_from_json(field, rows)?;
+        columns.push(column);
+    }
+
+    let batch = RecordBatch::try_new(schema.clone(), columns)?;
+
+    Ok(
+        exec_ctx.output_with_sender("KafkaMockScanExec.MockRecords", move 
|sender| async move {
+            sender.send(batch).await;
+            Ok(())
+        }),
+    )
+}
+
+fn build_array_from_json(field: &Field, rows: &sonic_rs::Array) -> 
Result<ArrayRef> {
+    let field_name = field.name();
+    let nullable = field.is_nullable();
+
+    macro_rules! build_typed_array {
+        ($builder_ty:ident, $extract:expr) => {{
+            let mut builder = $builder_ty::new();
+            for row in rows.iter() {
+                let val = row.get(field_name);
+                match val {
+                    Some(v) if !v.is_null() => {
+                        let extracted = ($extract)(v).ok_or_else(|| {
+                            DataFusionError::Execution(format!(
+                                "Field '{}' type mismatch, expected {}",
+                                field_name,
+                                field.data_type()
+                            ))
+                        })?;
+                        builder.append_value(extracted);
+                    }
+                    _ => {
+                        if nullable {
+                            builder.append_null();
+                        } else {
+                            return Err(DataFusionError::Execution(format!(
+                                "Field '{}' is non-nullable but got 
null/missing value",
+                                field_name
+                            )));
+                        }
+                    }
+                }
+            }
+            Ok(Arc::new(builder.finish()) as ArrayRef)
+        }};
+    }
+
+    match field.data_type() {
+        DataType::Boolean => {
+            build_typed_array!(BooleanBuilder, |v: &sonic_rs::Value| 
v.as_bool())
+        }
+        DataType::Int8 => {
+            build_typed_array!(Int8Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i8))
+        }
+        DataType::Int16 => {
+            build_typed_array!(Int16Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i16))
+        }
+        DataType::Int32 => {
+            build_typed_array!(Int32Builder, |v: &sonic_rs::Value| v
+                .as_i64()
+                .map(|n| n as i32))
+        }
+        DataType::Int64 => {
+            build_typed_array!(Int64Builder, |v: &sonic_rs::Value| v.as_i64())
+        }
+        DataType::UInt8 => {
+            build_typed_array!(UInt8Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u8))
+        }
+        DataType::UInt16 => {
+            build_typed_array!(UInt16Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u16))
+        }
+        DataType::UInt32 => {
+            build_typed_array!(UInt32Builder, |v: &sonic_rs::Value| v
+                .as_u64()
+                .map(|n| n as u32))
+        }
+        DataType::UInt64 => {
+            build_typed_array!(UInt64Builder, |v: &sonic_rs::Value| v.as_u64())
+        }
+        DataType::Float32 => {
+            build_typed_array!(Float32Builder, |v: &sonic_rs::Value| v
+                .as_f64()
+                .map(|n| n as f32))
+        }
+        DataType::Float64 => {
+            build_typed_array!(Float64Builder, |v: &sonic_rs::Value| 
v.as_f64())
+        }
+        DataType::Utf8 => {
+            build_typed_array!(StringBuilder, |v: &sonic_rs::Value| v
+                .as_str()
+                .map(|s| s.to_string()))
+        }
+        DataType::LargeUtf8 => {
+            build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| v
+                .as_str()
+                .map(|s| s.to_string()))

Review Comment:
   For `Utf8`/`LargeUtf8`, the extractor currently builds an owned `String` 
(`s.to_string()`), but `StringBuilder`/`LargeStringBuilder` in this codebase 
are used with `append_value(&str)` (see existing usages). This is likely to 
fail to compile due to type mismatch. Prefer extracting `Option<&str>` (via 
`as_str()`) and appending that, only allocating when truly needed.
   ```suggestion
               build_typed_array!(StringBuilder, |v: &sonic_rs::Value| 
v.as_str())
           }
           DataType::LargeUtf8 => {
               build_typed_array!(LargeStringBuilder, |v: &sonic_rs::Value| 
v.as_str())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to