This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new 107ab37 Add base implementation framework (#3)
107ab37 is described below
commit 107ab37470380ab905d9819472134c0b4d5060bb
Author: yuzelin <[email protected]>
AuthorDate: Fri Aug 16 11:55:28 2024 +0800
Add base implementation framework (#3)
---
.github/workflows/paimon-python-checks.yml | 2 +-
README.md | 2 +-
dev/dev-requirements.txt | 2 +
.../__init__.py | 5 -
java_based_implementation/api_impl.py | 112 +++++++++++++++++++
java_based_implementation/gateway_server.py | 73 +++++++++++++
java_based_implementation/java_gateway.py | 119 +++++++++++++++++++++
.../paimon-python-java-bridge/copyright.txt | 0
.../paimon-python-java-bridge/pom.xml | 0
.../java/org/apache/paimon/python/FileLock.java | 0
.../java/org/apache/paimon/python/NetUtils.java | 0
.../org/apache/paimon/python/PythonEnvUtils.java | 0
.../apache/paimon/python/PythonGatewayServer.java | 0
.../tools/maven/checkstyle.xml | 0
.../tools/maven/suppressions.xml | 0
.../tests/__init__.py | 5 -
.../tests/test_table_scan.py | 11 +-
.../tests/utils.py | 40 ++++---
.../util/__init__.py | 5 -
.../util/constants.py | 10 +-
.../util/exceptions.py | 17 ++-
.../util/java_utils.py | 11 +-
paimon_python_api/catalog.py | 2 +-
23 files changed, 361 insertions(+), 55 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index edb3a27..13c5c26 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -23,7 +23,7 @@ on:
pull_request:
paths-ignore:
- 'dev/**'
- - 'java-based-implementation/paimon-python-java-bridge/**'
+ - 'java_based_implementation/paimon-python-java-bridge/**'
- '**/*.md'
concurrency:
diff --git a/README.md b/README.md
index 374ad37..58e0c67 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ We can use `py4j` to leverage Java code to read Paimon data.
This section descri
### Build paimon-python-java-bridge
```bash
-cd java-based-implementation/paimon-python-java-bridge/
+cd java_based_implementation/paimon-python-java-bridge/
mvn clean install -DskipTests
```
The built target is
java-based-implementation/paimon-python-java-bridge/target/paimon-python-java-bridge-<version>.jar
diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt
index df46c0d..c9cfdbc 100755
--- a/dev/dev-requirements.txt
+++ b/dev/dev-requirements.txt
@@ -19,4 +19,6 @@
pip>=20.3
setuptools>=18.0
wheel
+py4j==0.10.9.7
+pyarrow>=5.0.0
pytest~=7.0
diff --git a/dev/dev-requirements.txt b/java_based_implementation/__init__.py
old mode 100755
new mode 100644
similarity index 95%
copy from dev/dev-requirements.txt
copy to java_based_implementation/__init__.py
index df46c0d..65b48d4
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/__init__.py
@@ -15,8 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
diff --git a/java_based_implementation/api_impl.py
b/java_based_implementation/api_impl.py
new file mode 100644
index 0000000..f0b35f1
--- /dev/null
+++ b/java_based_implementation/api_impl.py
@@ -0,0 +1,112 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from java_based_implementation.java_gateway import get_gateway
+from java_based_implementation.util.java_utils import to_j_catalog_context
+from paimon_python_api import catalog, read_builder, table_scan, split,
table_read
+from paimon_python_api import table
+from pyarrow import RecordBatchReader
+from typing import List
+from typing_extensions import Self
+
+
+class Catalog(catalog.Catalog):
+
+ def __init__(self, j_catalog):
+ self._j_catalog = j_catalog
+
+ @staticmethod
+ def create(catalog_context: dict) -> 'Catalog':
+ j_catalog_context = to_j_catalog_context(catalog_context)
+ gateway = get_gateway()
+ j_catalog = gateway.jvm.CatalogFactory.createCatalog(j_catalog_context)
+ return Catalog(j_catalog)
+
+ def get_table(self, identifier: tuple) -> 'Table':
+ gateway = get_gateway()
+ j_identifier = gateway.jvm.Identifier.fromString(identifier)
+ j_table = self._j_catalog.getTable(j_identifier)
+ return Table(j_table)
+
+
+class Table(table.Table):
+
+ def __init__(self, j_table):
+ self._j_table = j_table
+
+ def new_read_builder(self) -> 'ReadBuilder':
+ j_read_builder = self._j_table.newReadBuilder()
+ return ReadBuilder(j_read_builder)
+
+
+class ReadBuilder(read_builder.ReadBuilder):
+
+ def __init__(self, j_read_builder):
+ self._j_read_builder = j_read_builder
+
+ def with_projection(self, projection: List[List[int]]) -> Self:
+ self._j_read_builder.withProjection(projection)
+ return self
+
+ def with_limit(self, limit: int) -> Self:
+ self._j_read_builder.withLimit(limit)
+ return self
+
+ def new_scan(self) -> 'TableScan':
+ j_table_scan = self._j_read_builder.newScan()
+ return TableScan(j_table_scan)
+
+ def new_read(self) -> 'TableRead':
+ # TODO
+ pass
+
+
+class TableScan(table_scan.TableScan):
+
+ def __init__(self, j_table_scan):
+ self._j_table_scan = j_table_scan
+
+ def plan(self) -> 'Plan':
+ j_plan = self._j_table_scan.plan()
+ j_splits = j_plan.splits()
+ return Plan(j_splits)
+
+
+class Plan(table_scan.Plan):
+
+ def __init__(self, j_splits):
+ self._j_splits = j_splits
+
+ def splits(self) -> List['Split']:
+ return list(map(lambda s: Split(s), self._j_splits))
+
+
+class Split(split.Split):
+
+ def __init__(self, j_split):
+ self._j_split = j_split
+
+ def to_j_split(self):
+ return self._j_split
+
+
+class TableRead(table_read.TableRead):
+
+ def create_reader(self, split: Split) -> RecordBatchReader:
+ # TODO
+ pass
diff --git a/java_based_implementation/gateway_server.py
b/java_based_implementation/gateway_server.py
new file mode 100644
index 0000000..933669a
--- /dev/null
+++ b/java_based_implementation/gateway_server.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import platform
+import signal
+
+from subprocess import Popen, PIPE
+from java_based_implementation.util.constants import (PYPAIMON_JVM_ARGS,
PYPAIMON_JAVA_CLASSPATH,
+ PYPAIMON_MAIN_ARGS,
PYPAIMON_MAIN_CLASS)
+
+
+def on_windows():
+ return platform.system() == "Windows"
+
+
+def find_java_executable():
+ java_executable = "java.exe" if on_windows() else "java"
+ java_home = None
+
+ if java_home is None and "JAVA_HOME" in os.environ:
+ java_home = os.environ["JAVA_HOME"]
+
+ if java_home is not None:
+ java_executable = os.path.join(java_home, "bin", java_executable)
+
+ return java_executable
+
+
+def launch_gateway_server_process(env):
+ java_executable = find_java_executable()
+ # TODO construct Java module log settings
+ log_settings = []
+ jvm_args = env.get(PYPAIMON_JVM_ARGS, '').split()
+ classpath = env.get(PYPAIMON_JAVA_CLASSPATH)
+ main_args = env.get(PYPAIMON_MAIN_ARGS, '').split()
+ command = [
+ java_executable,
+ *jvm_args,
+ # default jvm args
+ "-XX:+IgnoreUnrecognizedVMOptions",
+ "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED",
+ *log_settings,
+ "-cp",
+ classpath,
+ "-c",
+ PYPAIMON_MAIN_CLASS,
+ *main_args
+ ]
+
+ if not on_windows():
+ def preexec_func():
+ # ignore ctrl-c / SIGINT
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ preexec_fn = preexec_func
+ return Popen(list(filter(lambda c: len(c) != 0, command)),
+ stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
diff --git a/java_based_implementation/java_gateway.py
b/java_based_implementation/java_gateway.py
new file mode 100644
index 0000000..1618b9c
--- /dev/null
+++ b/java_based_implementation/java_gateway.py
@@ -0,0 +1,119 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import struct
+import tempfile
+import time
+from logging import WARN
+from py4j.java_gateway import (java_import, logger, JavaGateway,
GatewayParameters,
+ CallbackServerParameters)
+from threading import RLock
+
+from java_based_implementation.gateway_server import
launch_gateway_server_process
+from java_based_implementation.util.constants import PYPAIMON_CONN_INFO_PATH
+from java_based_implementation.util.exceptions import install_py4j_hooks
+
+_gateway = None
+_lock = RLock()
+
+
+def get_gateway():
+ # type: () -> JavaGateway
+ global _gateway
+ global _lock
+ with _lock:
+ if _gateway is None:
+ # Set the level to WARN to mute the noisy INFO level logs
+ logger.level = WARN
+ _gateway = launch_gateway()
+
+ callback_server = _gateway.get_callback_server()
+ callback_server_listening_address =
callback_server.get_listening_address()
+ callback_server_listening_port =
callback_server.get_listening_port()
+
_gateway.jvm.org.apache.paimon.python.PythonEnvUtils.resetCallbackClient(
+ _gateway.java_gateway_server,
+ callback_server_listening_address,
+ callback_server_listening_port)
+ # import the paimon view
+ import_paimon_view(_gateway)
+ # TODO add exception handler for better exception stacktrace
+ install_py4j_hooks()
+ _gateway.entry_point.put("Watchdog", Watchdog())
+ return _gateway
+
+
+def launch_gateway():
+ # type: () -> JavaGateway
+ """
+ launch jvm gateway
+ """
+
+ # Create a temporary directory where the gateway server should write the
connection information.
+ conn_info_dir = tempfile.mkdtemp()
+ try:
+ fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
+ os.close(fd)
+ os.unlink(conn_info_file)
+
+ env = dict(os.environ)
+ env[PYPAIMON_CONN_INFO_PATH] = conn_info_file
+
+ p = launch_gateway_server_process(env)
+
+ while not p.poll() and not os.path.isfile(conn_info_file):
+ time.sleep(0.1)
+
+ if not os.path.isfile(conn_info_file):
+ stderr_info = p.stderr.read().decode('utf-8')
+ raise RuntimeError(
+ "Java gateway process exited before sending its port
number.\nStderr:\n"
+ + stderr_info
+ )
+
+ with open(conn_info_file, "rb") as info:
+ gateway_port = struct.unpack("!I", info.read(4))[0]
+ finally:
+ shutil.rmtree(conn_info_dir)
+
+ # Connect to the gateway
+ gateway = JavaGateway(
+ gateway_parameters=GatewayParameters(port=gateway_port,
auto_convert=True),
+ callback_server_parameters=CallbackServerParameters(
+ port=0, daemonize=True, daemonize_connections=True))
+
+ return gateway
+
+
+# TODO: import more
+def import_paimon_view(gateway):
+ java_import(gateway.jvm, "org.apache.paimon.table.*")
+
+
+class Watchdog(object):
+ """
+ Used to provide to Java side to check whether its parent process is alive.
+ """
+
+ def ping(self):
+ time.sleep(10)
+ return True
+
+ class Java:
+ implements = ["org.apache.paimon.python.PythonGatewayServer$Watchdog"]
diff --git a/java-based-implementation/paimon-python-java-bridge/copyright.txt
b/java_based_implementation/paimon-python-java-bridge/copyright.txt
similarity index 100%
rename from java-based-implementation/paimon-python-java-bridge/copyright.txt
rename to java_based_implementation/paimon-python-java-bridge/copyright.txt
diff --git a/java-based-implementation/paimon-python-java-bridge/pom.xml
b/java_based_implementation/paimon-python-java-bridge/pom.xml
similarity index 100%
rename from java-based-implementation/paimon-python-java-bridge/pom.xml
rename to java_based_implementation/paimon-python-java-bridge/pom.xml
diff --git
a/java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
b/java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
rename to
java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
diff --git
a/java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
b/java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
rename to
java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
diff --git
a/java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java
b/java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java
rename to
java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java
diff --git
a/java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java
b/java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java
rename to
java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java
diff --git
a/java-based-implementation/paimon-python-java-bridge/tools/maven/checkstyle.xml
b/java_based_implementation/paimon-python-java-bridge/tools/maven/checkstyle.xml
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/tools/maven/checkstyle.xml
rename to
java_based_implementation/paimon-python-java-bridge/tools/maven/checkstyle.xml
diff --git
a/java-based-implementation/paimon-python-java-bridge/tools/maven/suppressions.xml
b/java_based_implementation/paimon-python-java-bridge/tools/maven/suppressions.xml
similarity index 100%
rename from
java-based-implementation/paimon-python-java-bridge/tools/maven/suppressions.xml
rename to
java_based_implementation/paimon-python-java-bridge/tools/maven/suppressions.xml
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/tests/__init__.py
old mode 100755
new mode 100644
similarity index 95%
copy from dev/dev-requirements.txt
copy to java_based_implementation/tests/__init__.py
index df46c0d..65b48d4
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/tests/__init__.py
@@ -15,8 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/tests/test_table_scan.py
old mode 100755
new mode 100644
similarity index 90%
copy from dev/dev-requirements.txt
copy to java_based_implementation/tests/test_table_scan.py
index df46c0d..2e9a939
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/tests/test_table_scan.py
@@ -16,7 +16,10 @@
# limitations under the License.
################################################################################
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
+import unittest
+
+
+class TableScanTest(unittest.TestCase):
+
+ def test_splits_size(self):
+ pass
diff --git a/.github/workflows/paimon-python-checks.yml
b/java_based_implementation/tests/utils.py
similarity index 62%
copy from .github/workflows/paimon-python-checks.yml
copy to java_based_implementation/tests/utils.py
index edb3a27..05573f2 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/java_based_implementation/tests/utils.py
@@ -16,26 +16,24 @@
# limitations under the License.
################################################################################
-name: Check Code Style & Run Tests
+import os
+import shutil
+import subprocess
+import tempfile
-on:
- push:
- pull_request:
- paths-ignore:
- - 'dev/**'
- - 'java-based-implementation/paimon-python-java-bridge/**'
- - '**/*.md'
-concurrency:
- group: ${{ github.workflow }}-${{ github.event_name }}-${{
github.event.number || github.run_id }}
- cancel-in-progress: true
-
-jobs:
- lint-python:
- runs-on: ubuntu-latest
-
- steps:
- - name: Run lint-python.sh
- run: |
- chmod +x dev/lint-python.sh
- ./dev/lint-python.sh
+def set_bridge_jar() -> str:
+ java_module = '../paimon-python-java-bridge'
+ # build paimon-python-java-bridge
+ subprocess.run(
+ ["mvn", "clean", "package"],
+ cwd=java_module,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ jar_name = 'paimon-python-java-bridge-0.9-SNAPSHOT.jar'
+ jar_file = os.path.join(java_module, 'target', jar_name)
+ # move to temp dir
+ temp_dir = tempfile.mkdtemp()
+ shutil.move(jar_file, temp_dir)
+ return os.path.join(temp_dir, jar_name)
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/util/__init__.py
old mode 100755
new mode 100644
similarity index 95%
copy from dev/dev-requirements.txt
copy to java_based_implementation/util/__init__.py
index df46c0d..65b48d4
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/util/__init__.py
@@ -15,8 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/util/constants.py
old mode 100755
new mode 100644
similarity index 74%
copy from dev/dev-requirements.txt
copy to java_based_implementation/util/constants.py
index df46c0d..19f6eaf
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/util/constants.py
@@ -16,7 +16,9 @@
# limitations under the License.
################################################################################
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
+# ---------------------------- for env var ----------------------------
+PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
+PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
+PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
+PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
+PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/util/exceptions.py
old mode 100755
new mode 100644
similarity index 66%
copy from dev/dev-requirements.txt
copy to java_based_implementation/util/exceptions.py
index df46c0d..453d28b
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/util/exceptions.py
@@ -16,7 +16,16 @@
# limitations under the License.
################################################################################
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
+import py4j
+
+
+def install_py4j_hooks():
+ """
+ Hook the classes such as JavaPackage, etc of Py4j to improve the exception
message.
+ """
+ def wrapped_call(self, *args, **kwargs):
+ raise TypeError(
+ "Could not found the Java class '%s'. The Java dependencies could
be specified via "
+ "command line argument '--jarfile' or the config option
'pipeline.jars'" % self._fqn)
+
+ setattr(py4j.java_gateway.JavaPackage, '__call__', wrapped_call)
diff --git a/dev/dev-requirements.txt
b/java_based_implementation/util/java_utils.py
old mode 100755
new mode 100644
similarity index 79%
copy from dev/dev-requirements.txt
copy to java_based_implementation/util/java_utils.py
index df46c0d..83a3194
--- a/dev/dev-requirements.txt
+++ b/java_based_implementation/util/java_utils.py
@@ -16,7 +16,10 @@
# limitations under the License.
################################################################################
-pip>=20.3
-setuptools>=18.0
-wheel
-pytest~=7.0
+from java_based_implementation.java_gateway import get_gateway
+
+
+def to_j_catalog_context(catalog_context: dict):
+ gateway = get_gateway()
+ j_options = gateway.jvm.Options(catalog_context)
+ return gateway.jvm.CatalogContext.create(j_options)
diff --git a/paimon_python_api/catalog.py b/paimon_python_api/catalog.py
index 9f00515..3166761 100644
--- a/paimon_python_api/catalog.py
+++ b/paimon_python_api/catalog.py
@@ -32,5 +32,5 @@ class Catalog(ABC):
"""Create catalog from configuration."""
@abstractmethod
- def get_table(self, identifier: tuple) -> Table:
+ def get_table(self, identifier: str) -> Table:
"""Get paimon table identified by the given Identifier."""