This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b26535  [feat] Add examples and stub files for Python bindings (#10)
1b26535 is described below

commit 1b265353f7329cbc83b50e443aa44288899d370e
Author: naivedogger <[email protected]>
AuthorDate: Fri Oct 17 16:54:04 2025 +0800

    [feat] Add examples and stub files for Python bindings (#10)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 .licenserc.yaml                    |   1 +
 bindings/python/README.md          |  19 ++--
 bindings/python/example/example.py | 188 +++++++++++++++++++++++++++++++++++++
 bindings/python/fluss/__init__.pyi | 171 +++++++++++++++++++++++++++++++++
 bindings/python/fluss/py.typed     |   0
 5 files changed, 369 insertions(+), 10 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3813b48..a3cfcd1 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,4 +26,5 @@ header:
     - 'LICENSE'
     - 'NOTICE'
     - 'DISCLAIMER'
+    - 'bindings/python/fluss/py.typed'
   comment: on-failure
diff --git a/bindings/python/README.md b/bindings/python/README.md
index 5258f53..44d6099 100644
--- a/bindings/python/README.md
+++ b/bindings/python/README.md
@@ -108,7 +108,7 @@ uv run python example/example.py
 ### Build API docs:
 
 ```bash
-uv run pdoc fluss_python
+uv run pdoc fluss
 ```
 
 ### Release
@@ -124,10 +124,10 @@ uv run maturin publish
 ## Project Structure
 ```
 bindings/python/
-├── Cargo.toml              # Rust dependency configuration
-├── pyproject.toml          # Python project configuration
-├── README.md              # This file
-├── src/                   # Rust source code
+├── Cargo.toml            # Rust dependency configuration
+├── pyproject.toml        # Python project configuration
+├── README.md             # This file
+├── src/                  # Rust source code
 │   ├── lib.rs            # Main entry module
 │   ├── config.rs         # Configuration related
 │   ├── connection.rs     # Connection management
@@ -135,11 +135,10 @@ bindings/python/
 │   ├── table.rs          # Table operations
 │   ├── types.rs          # Data types
 │   └── error.rs          # Error handling
-├── python/               # Python package source
-│   └── fluss_python/
-│       ├── __init__.py   # Python package entry
-│       ├── __init__.pyi  # Stub file
-│       └── py.typed      # Type declarations
+├── fluss/                # Python package source
+│   ├── __init__.py       # Python package entry
+│   ├── __init__.pyi      # Stub file
+│   └── py.typed          # Type declarations
 └── example/              # Example code
     └── example.py
 ```
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
new file mode 100644
index 0000000..0523f94
--- /dev/null
+++ b/bindings/python/example/example.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import time
+
+import pandas as pd
+import pyarrow as pa
+
+import fluss
+
+
+async def main():
+    # Create connection configuration
+    config_spec = {
+        "bootstrap.servers": "127.0.0.1:9123",
+        # Add other configuration options as needed
+        "request.max.size": "10485760",  # 10 MB
+        "writer.acks": "all",  # Wait for all replicas to acknowledge
+        "writer.retries": "3",  # Retry up to 3 times on failure
+        "writer.batch.size": "1000",  # Batch size for writes
+    }
+    config = fluss.Config(config_spec)
+
+    # Create connection using the static connect method
+    conn = await fluss.FlussConnection.connect(config)
+
+    # Define fields for PyArrow
+    fields = [
+        pa.field("id", pa.int32()),
+        pa.field("name", pa.string()),
+        pa.field("score", pa.float32()),
+        pa.field("age", pa.int32()),
+    ]
+
+    # Create a PyArrow schema
+    schema = pa.schema(fields)
+
+    # Create a Fluss Schema first (this is what TableDescriptor expects)
+    fluss_schema = fluss.Schema(schema)
+
+    # Create a Fluss TableDescriptor
+    table_descriptor = fluss.TableDescriptor(fluss_schema)
+
+    # Get the admin for Fluss
+    admin = await conn.get_admin()
+
+    # Create a Fluss table
+    table_path = fluss.TablePath("fluss", "sample_table")
+
+    try:
+        await admin.create_table(table_path, table_descriptor, True)
+        print(f"Created table: {table_path}")
+    except Exception as e:
+        print(f"Table creation failed: {e}")
+
+    # Get table information via admin
+    try:
+        table_info = await admin.get_table(table_path)
+        print(f"Table info: {table_info}")
+        print(f"Table ID: {table_info.table_id}")
+        print(f"Schema ID: {table_info.schema_id}")
+        print(f"Created time: {table_info.created_time}")
+        print(f"Primary keys: {table_info.get_primary_keys()}")
+    except Exception as e:
+        print(f"Failed to get table info: {e}")
+
+    # Get the table instance
+    table = await conn.get_table(table_path)
+    print(f"Got table: {table}")
+
+    # Create a writer for the table
+    append_writer = await table.new_append_writer()
+    print(f"Created append writer: {append_writer}")
+
+    try:
+        # Test 1: Write PyArrow Table
+        print("\n--- Testing PyArrow Table write ---")
+        pa_table = pa.Table.from_arrays(
+            [
+                pa.array([1, 2, 3], type=pa.int32()),
+                pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
+                pa.array([95.2, 87.2, 92.1], type=pa.float32()),
+                pa.array([25, 30, 35], type=pa.int32()),
+            ],
+            schema=schema,
+        )
+
+        append_writer.write_arrow(pa_table)
+        print("Successfully wrote PyArrow Table")
+
+        # Test 2: Write PyArrow RecordBatch
+        print("\n--- Testing PyArrow RecordBatch write ---")
+        pa_record_batch = pa.RecordBatch.from_arrays(
+            [
+                pa.array([4, 5], type=pa.int32()),
+                pa.array(["David", "Eve"], type=pa.string()),
+                pa.array([88.5, 91.0], type=pa.float32()),
+                pa.array([28, 32], type=pa.int32()),
+            ],
+            schema=schema,
+        )
+
+        append_writer.write_arrow_batch(pa_record_batch)
+        print("Successfully wrote PyArrow RecordBatch")
+
+        # Test 3: Write Pandas DataFrame
+        print("\n--- Testing Pandas DataFrame write ---")
+        df = pd.DataFrame(
+            {
+                "id": [6, 7],
+                "name": ["Frank", "Grace"],
+                "score": [89.3, 94.7],
+                "age": [29, 27],
+            }
+        )
+
+        append_writer.write_pandas(df)
+        print("Successfully wrote Pandas DataFrame")
+
+        # Flush all pending data
+        print("\n--- Flushing data ---")
+        append_writer.flush()
+        print("Successfully flushed data")
+
+    except Exception as e:
+        print(f"Error during writing: {e}")
+
+    # Now scan the table to verify data was written
+    print("\n--- Scanning table ---")
+    try:
+        log_scanner = await table.new_log_scanner()
+        print(f"Created log scanner: {log_scanner}")
+
+        # Subscribe to scan from earliest to latest
+        # start_timestamp=None (earliest), end_timestamp=None (latest)
+        log_scanner.subscribe(None, None)
+
+        print("Scanning results using to_arrow():")
+
+        # Try to get as PyArrow Table
+        try:
+            pa_table_result = log_scanner.to_arrow()
+            print(f"\nAs PyArrow Table: {pa_table_result}")
+        except Exception as e:
+            print(f"Could not convert to PyArrow: {e}")
+
+        # Let's subscribe from the beginning again.
+        # Reset subscription
+        log_scanner.subscribe(None, None)
+
+        # Try to get as Pandas DataFrame
+        try:
+            df_result = log_scanner.to_pandas()
+            print(f"\nAs Pandas DataFrame:\n{df_result}")
+        except Exception as e:
+            print(f"Could not convert to Pandas: {e}")
+
+        # TODO: support to_arrow_batch_reader()
+        # which is reserved for streaming use cases
+
+        # TODO: support to_duckdb()
+
+    except Exception as e:
+        print(f"Error during scanning: {e}")
+
+    # Close connection
+    conn.close()
+    print("\nConnection closed")
+
+
+if __name__ == "__main__":
+    # Run the async main function
+    asyncio.run(main())
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
new file mode 100644
index 0000000..4565242
--- /dev/null
+++ b/bindings/python/fluss/__init__.pyi
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Type stubs for Fluss Python bindings."""
+
+from types import TracebackType
+from typing import Dict, List, Optional, Tuple
+
+import pandas as pd
+import pyarrow as pa
+
+class Config:
+    def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: 
...
+    @property
+    def bootstrap_server(self) -> Optional[str]: ...
+    @bootstrap_server.setter
+    def bootstrap_server(self, server: str) -> None: ...
+    @property
+    def request_max_size(self) -> int: ...
+    @request_max_size.setter
+    def request_max_size(self, size: int) -> None: ...
+    @property
+    def writer_batch_size(self) -> int: ...
+    @writer_batch_size.setter
+    def writer_batch_size(self, size: int) -> None: ...
+
+class FlussConnection:
+    @staticmethod
+    async def connect(config: Config) -> FlussConnection: ...
+    async def get_admin(self) -> FlussAdmin: ...
+    async def get_table(self, table_path: TablePath) -> FlussTable: ...
+    def close(self) -> None: ...
+    def __enter__(self) -> FlussConnection: ...
+    def __exit__(self, exc_type: Optional[type], exc_value: 
Optional[BaseException], traceback: Optional[TracebackType]) -> bool: ...
+    def __repr__(self) -> str: ...
+
+class FlussAdmin:
+    async def create_table(
+        self,
+        table_path: TablePath,
+        table_descriptor: TableDescriptor,
+        ignore_if_exists: Optional[bool] = False,
+    ) -> None: ...
+    async def get_table(self, table_path: TablePath) -> TableInfo: ...
+    async def get_latest_lake_snapshot(self, table_path: TablePath) -> 
LakeSnapshot: ...
+    def __repr__(self) -> str: ...
+
+class FlussTable:
+    async def new_append_writer(self) -> AppendWriter: ...
+    async def new_log_scanner(self) -> LogScanner: ...
+    def get_table_info(self) -> TableInfo: ...
+    def get_table_path(self) -> TablePath: ...
+    def has_primary_key(self) -> bool: ...
+    def __repr__(self) -> str: ...
+
+class AppendWriter:
+    def write_arrow(self, table: pa.Table) -> None: ...
+    def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
+    def write_pandas(self, df: pd.DataFrame) -> None: ...
+    def flush(self) -> None: ...
+    def __repr__(self) -> str: ...
+
+class LogScanner:
+    def subscribe(
+        self, start_timestamp: Optional[int], end_timestamp: Optional[int]
+    ) -> None: ...
+    def to_pandas(self) -> pd.DataFrame: ...
+    def to_arrow(self) -> pa.Table: ...
+    def __repr__(self) -> str: ...
+
+class Schema:
+    def __init__(self, schema: pa.Schema, primary_keys: Optional[List[str]] = 
None) -> None: ...
+    def get_column_names(self) -> List[str]: ...
+    def get_column_types(self) -> List[str]: ...
+    def get_columns(self) -> List[Tuple[str,str]]: ...
+    def __str__(self) -> str: ...
+
+class TableDescriptor:
+    def __init__(self, schema: Schema, **kwargs: str) -> None: ...
+    def get_schema(self) -> Schema: ...
+
+class TablePath:
+    def __init__(self, database: str, table: str) -> None: ...
+    @property
+    def database_name(self) -> str: ...
+    @property
+    def table_name(self) -> str: ...
+    def table_path_str(self) -> str: ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+    def __hash__(self) -> int: ...
+    def __eq__(self, other: object) -> bool: ...
+
+class TableInfo:
+    @property
+    def table_id(self) -> int: ...
+    @property
+    def schema_id(self) -> int: ...
+    @property
+    def created_time(self) -> int: ...
+    @property
+    def modified_time(self) -> int: ...
+    @property
+    def table_path(self) -> TablePath: ...
+    @property
+    def num_buckets(self) -> int: ...
+    @property
+    def comment(self) -> Optional[str]: ...
+    def get_primary_keys(self) -> List[str]: ...
+    def get_bucket_keys(self) -> List[str]: ...
+    def get_partition_keys(self) -> List[str]: ...
+    def has_primary_key(self) -> bool: ...
+    def is_partitioned(self) -> bool: ...
+    def get_properties(self) -> Dict[str, str]: ...
+    def get_custom_properties(self) -> Dict[str, str]: ...
+    def get_schema(self) -> Schema: ...
+    def get_column_names(self) -> List[str]: ...
+    def get_column_count(self) -> int: ...
+
+class FlussError(Exception):
+    message: str
+    def __init__(self, message: str) -> None: ...
+    def __str__(self) -> str: ...
+
+class LakeSnapshot:
+    def __init__(self, snapshot_id: int) -> None: ...
+    @property
+    def snapshot_id(self) -> int: ...
+    @property
+    def table_buckets_offset(self) -> Dict[TableBucket, int]: ...
+    def get_bucket_offset(self, bucket: TableBucket) -> Optional[int]: ...
+    def get_table_buckets(self) -> List[TableBucket]: ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+
+class TableBucket:
+    def __init__(self, table_id: int, bucket: int) -> None: ...
+    @staticmethod
+    def with_partition(
+        table_id: int, partition_id: int, bucket: int
+    ) -> TableBucket: ...
+    @property
+    def table_id(self) -> int: ...
+    @property
+    def bucket_id(self) -> int: ...
+    @property
+    def partition_id(self) -> Optional[int]: ...
+    def __hash__(self) -> int: ...
+    def __eq__(self, other: object) -> bool: ...
+    def __str__(self) -> str: ...
+    def __repr__(self) -> str: ...
+
+class TableDistribution:
+    def bucket_keys(self) -> List[str]: ...
+    def bucket_count(self) -> Optional[int]: ...
+
+__version__: str
diff --git a/bindings/python/fluss/py.typed b/bindings/python/fluss/py.typed
new file mode 100644
index 0000000..e69de29

Reply via email to