XiaoHongbo-Hope commented on code in PR #7157:
URL: https://github.com/apache/paimon/pull/7157#discussion_r2752955897


##########
paimon-python/pypaimon/read/table_read.py:
##########
@@ -54,31 +58,49 @@ def _record_generator():
         return _record_generator()
 
     def to_arrow_batch_reader(self, splits: List[Split]) -> 
pyarrow.ipc.RecordBatchReader:
-        schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
+        schema = self._schema_with_row_tracking_not_null(
+            PyarrowFieldParser.from_paimon_schema(self.read_type)
+        )
         batch_iterator = self._arrow_batch_generator(splits, schema)
         return pyarrow.ipc.RecordBatchReader.from_batches(schema, 
batch_iterator)
 
     @staticmethod
     def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
-        if batch.schema.names == target_schema.names:
-            return batch
-
-        columns = []
         num_rows = batch.num_rows
+        columns = []
 
+        batch_column_names = batch.schema.names  # pyarrow 0.17+; 
RecordBatch.column_names not in py36
         for field in target_schema:
-            if field.name in batch.column_names:
+            if field.name in batch_column_names:
                 col = batch.column(field.name)
+                if col.type != field.type:
+                    if col.type.id == field.type.id:
+                        col = col.cast(field.type)
+                    else:
+                        col = pyarrow.nulls(num_rows, type=field.type)
             else:
                 col = pyarrow.nulls(num_rows, type=field.type)
             columns.append(col)
 
         return pyarrow.RecordBatch.from_arrays(columns, schema=target_schema)
 
+    @staticmethod
+    def _schema_with_row_tracking_not_null(schema: pyarrow.Schema) -> 
pyarrow.Schema:
+        """Ensure _ROW_ID and _SEQUENCE_NUMBER are not null (per 
SpecialFields)."""
+        fields = []
+        for field in schema:
+            if field.name == SpecialFields.ROW_ID.name or field.name == 
SpecialFields.SEQUENCE_NUMBER.name:

Review Comment:
   > Why it can be nullable?
   
   A bug here. Nullable info of row-tracking system fields is lost during 
_assign_row_tracking. Opened a separate PR #7174 to fix it.



##########
paimon-python/pypaimon/read/table_read.py:
##########
@@ -54,31 +58,49 @@ def _record_generator():
         return _record_generator()
 
     def to_arrow_batch_reader(self, splits: List[Split]) -> 
pyarrow.ipc.RecordBatchReader:
-        schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
+        schema = self._schema_with_row_tracking_not_null(
+            PyarrowFieldParser.from_paimon_schema(self.read_type)
+        )
         batch_iterator = self._arrow_batch_generator(splits, schema)
         return pyarrow.ipc.RecordBatchReader.from_batches(schema, 
batch_iterator)
 
     @staticmethod
     def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
-        if batch.schema.names == target_schema.names:
-            return batch
-
-        columns = []
         num_rows = batch.num_rows
+        columns = []
 
+        batch_column_names = batch.schema.names  # pyarrow 0.17+; 
RecordBatch.column_names not in py36
         for field in target_schema:
-            if field.name in batch.column_names:
+            if field.name in batch_column_names:
                 col = batch.column(field.name)
+                if col.type != field.type:
+                    if col.type.id == field.type.id:
+                        col = col.cast(field.type)
+                    else:
+                        col = pyarrow.nulls(num_rows, type=field.type)
             else:
                 col = pyarrow.nulls(num_rows, type=field.type)
             columns.append(col)
 
         return pyarrow.RecordBatch.from_arrays(columns, schema=target_schema)
 
+    @staticmethod
+    def _schema_with_row_tracking_not_null(schema: pyarrow.Schema) -> 
pyarrow.Schema:
+        """Ensure _ROW_ID and _SEQUENCE_NUMBER are not null (per 
SpecialFields)."""
+        fields = []
+        for field in schema:
+            if field.name == SpecialFields.ROW_ID.name or field.name == 
SpecialFields.SEQUENCE_NUMBER.name:

Review Comment:
   > Why it can be nullable?
   
   A bug here. Nullable info of row-tracking system fields is lost during 
`_assign_row_tracking`. Opened a separate PR #7174 to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to