This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f1c3293a1 [python] ray version compatible (#6937)
7f1c3293a1 is described below

commit 7f1c3293a15ebeff994f4e90d337b57e9de202de
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Dec 31 18:16:53 2025 +0800

    [python] ray version compatible (#6937)
---
 .github/workflows/paimon-python-checks.yml    | 104 ++++++++++++++++++++++++++
 paimon-python/pypaimon/read/ray_datasource.py |  50 +++++++++----
 2 files changed, 138 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 6df806a4f4..4fb7fe07e4 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -101,3 +101,107 @@ jobs:
         run: |
           chmod +x paimon-python/dev/lint-python.sh
           ./paimon-python/dev/lint-python.sh
+
+  requirement_version_compatible_test:
+    runs-on: ubuntu-latest
+    container: "python:3.10-slim"
+
+    steps:
+      - name: Checkout code
+        uses: actions/checkout@v2
+
+      - name: Set up JDK ${{ env.JDK_VERSION }}
+        uses: actions/setup-java@v4
+        with:
+          java-version: ${{ env.JDK_VERSION }}
+          distribution: 'temurin'
+
+      - name: Set up Maven
+        uses: stCarolas/[email protected]
+        with:
+          maven-version: 3.8.8
+
+      - name: Install system dependencies
+        shell: bash
+        run: |
+          apt-get update && apt-get install -y \
+            build-essential \
+            git \
+            curl \
+            && rm -rf /var/lib/apt/lists/*
+
+      - name: Verify Java and Maven installation
+        run: |
+          java -version
+          mvn -version
+
+      - name: Verify Python version
+        run: python --version
+
+      - name: Build Java
+        run: |
+          echo "Start compiling modules"
+          mvn -T 2C -B clean install -DskipTests
+
+      - name: Install base Python dependencies
+        shell: bash
+        run: |
+          python -m pip install --upgrade pip
+          python -m pip install -q \
+            pyroaring \
+            readerwriterlock==1.0.9 \
+            fsspec==2024.3.1 \
+            cachetools==5.3.3 \
+            ossfs==2023.12.0 \
+            fastavro==1.11.1 \
+            pyarrow==16.0.0 \
+            zstandard==0.24.0 \
+            polars==1.32.0 \
+            duckdb==1.3.2 \
+            numpy==1.24.3 \
+            pandas==2.0.3 \
+            pytest~=7.0 \
+            py4j==0.10.9.9 \
+            requests \
+            parameterized==0.9.0 \
+            packaging
+
+      - name: Test requirement version compatibility
+        shell: bash
+        run: |
+          cd paimon-python
+          
+          # Test Ray version compatibility
+          echo "=========================================="
+          echo "Testing Ray version compatibility"
+          echo "=========================================="
+          for ray_version in 2.44.0 2.48.0 2.53.0; do
+            echo "Testing Ray version: $ray_version"
+            
+            # Install specific Ray version
+            python -m pip install -q ray==$ray_version
+            
+            # Verify Ray version
+            python -c "import ray; print(f'Ray version: {ray.__version__}')"
+            python -c "from packaging.version import parse; import ray; assert 
parse(ray.__version__) == parse('$ray_version'), f'Expected Ray $ray_version, 
got {ray.__version__}'"
+            
+            # Run tests
+            python -m pytest pypaimon/tests/ray_data_test.py::RayDataTest -v 
--tb=short || {
+              echo "Tests failed for Ray $ray_version"
+              exit 1
+            }
+            
+            # Uninstall Ray to avoid conflicts
+            python -m pip uninstall -y ray
+          done
+          
+          # Add other dependency version tests here in the future
+          # Example:
+          # echo "=========================================="
+          # echo "Testing PyArrow version compatibility"
+          # echo "=========================================="
+          # for pyarrow_version in 16.0.0 17.0.0 18.0.0; do
+          #   ...
+          # done
+        env:
+          PYTHONPATH: ${{ github.workspace }}/paimon-python
diff --git a/paimon-python/pypaimon/read/ray_datasource.py 
b/paimon-python/pypaimon/read/ray_datasource.py
index 9a13ae0fa1..905c8bddef 100644
--- a/paimon-python/pypaimon/read/ray_datasource.py
+++ b/paimon-python/pypaimon/read/ray_datasource.py
@@ -25,6 +25,8 @@ from functools import partial
 from typing import List, Optional, Iterable
 
 import pyarrow
+from packaging.version import parse
+import ray
 
 from pypaimon.read.split import Split
 from pypaimon.read.table_read import TableRead
@@ -32,6 +34,10 @@ from pypaimon.schema.data_types import PyarrowFieldParser
 
 logger = logging.getLogger(__name__)
 
+# Ray version constants for compatibility
+RAY_VERSION_SCHEMA_IN_READ_TASK = "2.48.0"  # Schema moved from BlockMetadata 
to ReadTask
+RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0"  # per_task_row_limit parameter 
introduced
+
 from ray.data.datasource import Datasource
 
 
@@ -94,11 +100,13 @@ class PaimonDatasource(Datasource):
 
         return chunks
 
-    def get_read_tasks(self, parallelism: int) -> List:
+    def get_read_tasks(self, parallelism: int, **kwargs) -> List:
         """Return a list of read tasks that can be executed in parallel."""
         from ray.data.datasource import ReadTask
         from ray.data.block import BlockMetadata
 
+        per_task_row_limit = kwargs.get('per_task_row_limit', None)
+
         # Validate parallelism parameter
         if parallelism < 1:
             raise ValueError(f"parallelism must be at least 1, got 
{parallelism}")
@@ -191,20 +199,30 @@ class PaimonDatasource(Datasource):
                 num_rows = total_rows if total_rows > 0 else None
             size_bytes = total_size if total_size > 0 else None
 
-            metadata = BlockMetadata(
-                num_rows=num_rows,
-                size_bytes=size_bytes,
-                input_files=input_files if input_files else None,
-                exec_stats=None,  # Will be populated by Ray during execution
-            )
-
-            # TODO: per_task_row_limit is not supported in Ray 2.48.0, will be 
added in future versions
-            read_tasks.append(
-                ReadTask(
-                    read_fn=lambda splits=chunk_splits: get_read_task(splits),
-                    metadata=metadata,
-                    schema=schema,
-                )
-            )
+            metadata_kwargs = {
+                'num_rows': num_rows,
+                'size_bytes': size_bytes,
+                'input_files': input_files if input_files else None,
+                'exec_stats': None,  # Will be populated by Ray during 
execution
+            }
+
+            if parse(ray.__version__) < parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
+                metadata_kwargs['schema'] = schema
+
+            metadata = BlockMetadata(**metadata_kwargs)
+
+            read_fn = partial(get_read_task, chunk_splits)
+            read_task_kwargs = {
+                'read_fn': read_fn,
+                'metadata': metadata,
+            }
+            
+            if parse(ray.__version__) >= 
parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
+                read_task_kwargs['schema'] = schema
+            
+            if parse(ray.__version__) >= parse(RAY_VERSION_PER_TASK_ROW_LIMIT) 
and per_task_row_limit is not None:
+                read_task_kwargs['per_task_row_limit'] = per_task_row_limit
+
+            read_tasks.append(ReadTask(**read_task_kwargs))
 
         return read_tasks

Reply via email to