This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 75d00d7  #38 Expose More Metadata in Object APIs (#39)
75d00d7 is described below

commit 75d00d7ad94cc5fa83b03b4d4aa1f401429f0224
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Feb 19 15:27:43 2025 +0800

    #38 Expose More Metadata in Object APIs (#39)
---
 pypaimon/api/read_builder.py                |  9 ++++
 pypaimon/api/{split.py => row_type.py}      | 12 +++--
 pypaimon/api/split.py                       | 14 +++++-
 pypaimon/py4j/java_implementation.py        | 31 +++++++++++-
 pypaimon/py4j/tests/test_object_metadata.py | 73 +++++++++++++++++++++++++++++
 5 files changed, 134 insertions(+), 5 deletions(-)

diff --git a/pypaimon/api/read_builder.py b/pypaimon/api/read_builder.py
index 68b7d46..cd5715b 100644
--- a/pypaimon/api/read_builder.py
+++ b/pypaimon/api/read_builder.py
@@ -20,6 +20,8 @@ from abc import ABC, abstractmethod
 from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder
 from typing import List
 
+from pypaimon.api.row_type import RowType
+
 
 class ReadBuilder(ABC):
     """An interface for building the TableScan and TableRead."""
@@ -50,3 +52,10 @@ class ReadBuilder(ABC):
     @abstractmethod
     def new_predicate_builder(self) -> PredicateBuilder:
         """Create a builder for Predicate."""
+
+    @abstractmethod
+    def read_type(self) -> RowType:
+        """
+        Return the row type of the builder. If there is a projection inside
+        the builder, the row type will only contain the selected fields.
+        """
diff --git a/pypaimon/api/split.py b/pypaimon/api/row_type.py
similarity index 80%
copy from pypaimon/api/split.py
copy to pypaimon/api/row_type.py
index 386c72b..b15c413 100644
--- a/pypaimon/api/split.py
+++ b/pypaimon/api/row_type.py
@@ -16,8 +16,14 @@
 # limitations under the License.
 
#################################################################################
 
-from abc import ABC
+import pyarrow as pa
 
+from abc import ABC, abstractmethod
 
-class Split(ABC):
-    """An input split for reading. The most important subclass is DataSplit."""
+
+class RowType(ABC):
+    """Data type of a sequence of fields."""
+
+    @abstractmethod
+    def as_arrow(self) -> "pa.Schema":
+        """Return the row type as an Arrow schema."""
diff --git a/pypaimon/api/split.py b/pypaimon/api/split.py
index 386c72b..5b9115c 100644
--- a/pypaimon/api/split.py
+++ b/pypaimon/api/split.py
@@ -16,8 +16,20 @@
 # limitations under the License.
 
#################################################################################
 
-from abc import ABC
+from abc import ABC, abstractmethod
+
+from typing import Iterator
 
 
 class Split(ABC):
     """An input split for reading. The most important subclass is DataSplit."""
+
+    @abstractmethod
+    def row_count(self) -> int:
+        """Return the total row count of the split."""
+
+    def file_size(self) -> int:
+        """Return the total file size of the split."""
+
+    def file_paths(self) -> Iterator[str]:
+        """Return the paths of all raw files in the split."""
diff --git a/pypaimon/py4j/java_implementation.py 
b/pypaimon/py4j/java_implementation.py
index ce90bc5..9f378b7 100644
--- a/pypaimon/py4j/java_implementation.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -24,7 +24,7 @@ import pyarrow as pa
 from pypaimon.py4j.java_gateway import get_gateway
 from pypaimon.py4j.util import java_utils, constants
 from pypaimon.api import \
-    (catalog, table, read_builder, table_scan, split,
+    (catalog, table, read_builder, table_scan, split, row_type,
      table_read, write_builder, table_write, commit_message,
      table_commit, Schema, predicate)
 from typing import List, Iterator, Optional, Any, TYPE_CHECKING
@@ -115,6 +115,18 @@ class ReadBuilder(read_builder.ReadBuilder):
     def new_predicate_builder(self) -> 'PredicateBuilder':
         return PredicateBuilder(self._j_row_type)
 
+    def read_type(self) -> 'RowType':
+        return RowType(self._j_read_builder.readType())
+
+
+class RowType(row_type.RowType):
+
+    def __init__(self, j_row_type):
+        self._j_row_type = j_row_type
+
+    def as_arrow(self) -> "pa.Schema":
+        return java_utils.to_arrow_schema(self._j_row_type)
+
 
 class TableScan(table_scan.TableScan):
 
@@ -144,6 +156,23 @@ class Split(split.Split):
     def to_j_split(self):
         return self._j_split
 
+    def row_count(self) -> int:
+        return self._j_split.rowCount()
+
+    def file_size(self) -> int:
+        files_optional = self._j_split.convertToRawFiles()
+        if not files_optional.isPresent():
+            return 0
+        files = files_optional.get()
+        return sum(file.length() for file in files)
+
+    def file_paths(self) -> List[str]:
+        files_optional = self._j_split.convertToRawFiles()
+        if not files_optional.isPresent():
+            return []
+        files = files_optional.get()
+        return [file.path() for file in files]
+
 
 class TableRead(table_read.TableRead):
 
diff --git a/pypaimon/py4j/tests/test_object_metadata.py 
b/pypaimon/py4j/tests/test_object_metadata.py
new file mode 100644
index 0000000..e3591c9
--- /dev/null
+++ b/pypaimon/py4j/tests/test_object_metadata.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.py4j.tests import PypaimonTestBase
+
+
+class ObjectInfoTest(PypaimonTestBase):
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string())
+        ])
+
+    def test_read_type_metadata(self):
+        schema = Schema(self.simple_pa_schema)
+        self.catalog.create_table('default.test_read_type_metadata', schema, 
False)
+        table = self.catalog.get_table('default.test_read_type_metadata')
+
+        read_builder = table.new_read_builder()
+        read_builder.with_projection(['f1'])
+        pa_schema = read_builder.read_type().as_arrow()
+
+        self.assertEqual(len(pa_schema.names), 1)
+        self.assertEqual(pa_schema.names[0], 'f1')
+
+    def test_split_metadata(self):
+        schema = Schema(self.simple_pa_schema)
+        self.catalog.create_table('default.test_split_metadata', schema, False)
+        table = self.catalog.get_table('default.test_split_metadata')
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data = {
+            'f0': [1, 2, 3, 4, 5],
+            'f1': ['a', 'b', 'c', 'd', 'e'],
+        }
+        pa_table = pa.Table.from_pydict(data, schema=self.simple_pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        splits = table_scan.plan().splits()
+
+        self.assertEqual(len(splits), 1)
+        self.assertEqual(len(splits[0].file_paths()), 1)
+        self.assertEqual(splits[0].row_count(), 5)
+        self.assertTrue(splits[0].file_paths()[0].endswith('.parquet'))
+        self.assertEqual(splits[0].file_size(), 
os.path.getsize(splits[0].file_paths()[0]))

Reply via email to