This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26b18c6e6f1 Revert "[yaml] : switch js2py to pythonmonkey (#37560)"
(#37665)
26b18c6e6f1 is described below
commit 26b18c6e6f14e13082e565f6ef5e766d294fa25f
Author: Derrick Williams <[email protected]>
AuthorDate: Mon Feb 23 18:52:48 2026 -0500
Revert "[yaml] : switch js2py to pythonmonkey (#37560)" (#37665)
This reverts commit 5d6cb04ea05b5e981ac7fbf0122135870d7b2a4c.
---
sdks/python/apache_beam/yaml/standard_io.yaml | 1 -
sdks/python/apache_beam/yaml/yaml_mapping.py | 249 +++++++++-----------------
sdks/python/apache_beam/yaml/yaml_udf_test.py | 21 +--
sdks/python/setup.py | 2 +
4 files changed, 93 insertions(+), 180 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index d510d73562c..e62b3a562c3 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -145,7 +145,6 @@
path: 'path'
'WriteToJson':
path: 'path'
- num_shards: 'num_shards'
'ReadFromParquet':
path: 'file_pattern'
'WriteToParquet':
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 69b38db4173..a6b2b570475 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -16,15 +16,8 @@
#
"""This module defines the basic MapToFields operation."""
-import atexit
-import importlib
import itertools
-import os
-import queue
import re
-import sys
-import threading
-import uuid
from collections import abc
from collections.abc import Callable
from collections.abc import Collection
@@ -60,6 +53,14 @@ from apache_beam.yaml.yaml_errors import
maybe_with_exception_handling
from apache_beam.yaml.yaml_errors import
maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows
+# Import js2py package if it exists
+try:
+ import js2py
+ from js2py.base import JsObjectWrapper
+except ImportError:
+ js2py = None
+ JsObjectWrapper = object
+
_str_expression_fields = {
'AssignTimestamps': 'timestamp',
'Filter': 'keep',
@@ -177,6 +178,20 @@ def _check_mapping_arguments(
raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+# js2py's JsObjectWrapper object has a self-referencing __dict__ property
+# that cannot be pickled without implementing the __getstate__ and
+# __setstate__ methods.
+class _CustomJsObjectWrapper(JsObjectWrapper):
+ def __init__(self, js_obj):
+ super().__init__(js_obj.__dict__['_obj'])
+
+ def __getstate__(self):
+ return self.__dict__.copy()
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+
# TODO(yaml) Improve type inferencing for JS UDF's
def py_value_to_js_dict(py_value):
if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
@@ -190,181 +205,85 @@ def py_value_to_js_dict(py_value):
return py_value
-class PythonMonkeyDispatcher:
- """Dispatcher for executing JavaScript code using pythonmonkey.
-
- This class manages a worker thread to execute JavaScript, ensuring that
- pythonmonkey is only imported and used within that thread. It also handles
- process shutdown carefully to avoid segmentation faults known to occur
- when pythonmonkey is present during standard Python interpreter finalization.
- """
- def __init__(self):
- self._req_queue = queue.Queue()
- self._resp_events = {}
- self._resp_data = {}
- self._lock = threading.Lock()
- self._thread = threading.Thread(target=self._worker, daemon=True)
- self._started = False
- # Register the stop method to be called on exit.
- # atexit handlers are executed in LIFO order. By registering at import
time,
- # we ensure this handler runs last, allowing other cleanup handlers
- # (registered later) to execute first.
- atexit.register(self.stop)
-
- def start(self):
- with self._lock:
- if not self._started:
- self._thread.start()
- self._started = True
-
- def stop(self):
- # This method is called on process exit.
- if not self._started:
- return
- # Flush standard streams before forced exit to avoid data loss.
- try:
- sys.stdout.flush()
- sys.stderr.flush()
- except Exception:
- pass
- # Force an immediate exit to avoid a segmentation fault that occurs with
- # pythonmonkey during standard interpreter finalization.
- # Since this runs as one of the last atexit handlers (due to import-time
- # registration), most other cleanup should have already completed.
- os._exit(0)
-
- def _worker(self):
- try:
- import pythonmonkey as pm
- except ImportError:
- pm = None
-
- self._pm = pm
- self._cache = {}
-
- while True:
- req = self._req_queue.get()
- if req is None:
- break
-
- req_id, type_str, payload = req
- res = None
- is_err = False
- try:
- if self._pm is None:
- raise ImportError(
- "PythonMonkey not installed or failed to import in worker
thread."
- )
-
- if type_str == 'exec':
- source, row = payload
- if source not in self._cache:
- self._cache[source] = self._pm.eval(f"({source})")
- func = self._cache[source]
- res = func(row)
- except Exception as e:
- res = e
- is_err = True
-
- with self._lock:
- if req_id in self._resp_events:
- self._resp_data[req_id] = (is_err, res)
- self._resp_events[req_id].set()
-
- def eval_and_run(self, source, row):
- if not self._started:
- self.start()
-
- req_id = str(uuid.uuid4())
- event = threading.Event()
- with self._lock:
- self._resp_events[req_id] = event
-
- self._req_queue.put((req_id, 'exec', (source, row)))
- event.wait()
-
- with self._lock:
- is_err, result = self._resp_data.pop(req_id)
- del self._resp_events[req_id]
-
- if is_err:
- raise result
- return result
-
-
-_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
-
-
-class JavaScriptCallable:
- def __init__(self, source, name=None):
- self._source = source
- self._name = name
-
- def __call__(self, row):
- # Check for pythonmonkey availability lazily (on first call)
- if importlib.util.find_spec("pythonmonkey") is None:
- raise RuntimeError(
- "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
- "to use JavaScript mapping functions.")
-
- row_as_dict = py_value_to_js_dict(row)
- try:
- # If we have a name, it means we evaluated a file and need to call
- # a specific function.
- # Dispatcher expects a self-contained source/expression.
- if self._name:
- # Wrap: (function() { <source>; return <name>; })()
- effective_source = (
- f"(function() {{ {self._source}; return {self._name}; }})()")
- else:
- # Expression/Callable case: Wrap in parens to be safe
- effective_source = f"({self._source})"
-
- js_result = _pythonmonkey_dispatcher.eval_and_run(
- effective_source, row_as_dict)
-
- except Exception as exn:
- raise RuntimeError(
- f"Error evaluating javascript expression: {exn}") from exn
- return dicts_to_rows(_finalize_js_result(js_result))
-
-
-def _finalize_js_result(obj):
- """Coerce pythonmonkey objects to native Python objects (specifically
- strings).
- """
- if isinstance(obj, str):
- return str(obj)
- if isinstance(obj, list):
- return [_finalize_js_result(x) for x in obj]
- if isinstance(obj, dict):
- return {k: _finalize_js_result(v) for k, v in obj.items()}
- return obj
-
-
+# TODO(yaml) Consider adding optional language version parameter to support
+# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
- if importlib.util.find_spec("pythonmonkey") is None:
+ # Check for installed js2py package
+ if js2py is None:
raise ValueError(
- "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
- "to use JavaScript mapping functions.")
+ "Javascript mapping functions are not supported on"
+ " Python 3.12 or later.")
+
+ # import remaining js2py objects
+ from js2py import base
+ from js2py.constructors import jsdate
+ from js2py.internals import simplex
+
+ js_array_type = (
+ base.PyJsArray,
+ base.PyJsArrayBuffer,
+ base.PyJsInt8Array,
+ base.PyJsUint8Array,
+ base.PyJsUint8ClampedArray,
+ base.PyJsInt16Array,
+ base.PyJsUint16Array,
+ base.PyJsInt32Array,
+ base.PyJsUint32Array,
+ base.PyJsFloat32Array,
+ base.PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+ return base.to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, jsdate.PyJsDate):
+ return obj.to_utc_dt()
+ elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+ return None
+ elif isinstance(obj, base.PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, base.PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for (key, value) in obj.own.items()
+ }
+ elif isinstance(obj, base.JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
if expression:
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
- return JavaScriptCallable(source)
+ js_func = _CustomJsObjectWrapper(js2py.eval_js(source))
elif callable:
- return JavaScriptCallable(callable)
+ js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))
else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
- return JavaScriptCallable(udf_code, name=name)
+ js = js2py.EvalJs()
+ js.eval(udf_code)
+ js_func = _CustomJsObjectWrapper(getattr(js, name))
+
+ def js_wrapper(row):
+ row_as_dict = py_value_to_js_dict(row)
+ try:
+ js_result = js_func(row_as_dict)
+ except simplex.JsException as exn:
+ raise RuntimeError(
+ f"Error evaluating javascript expression: "
+ f"{exn.mes['message']}") from exn
+ return dicts_to_rows(_js_object_to_py_object(js_result))
+
+ return js_wrapper
def _expand_python_mapping_func(
diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py
b/sdks/python/apache_beam/yaml/yaml_udf_test.py
index e6d0a0af41a..3d664ab9de4 100644
--- a/sdks/python/apache_beam/yaml/yaml_udf_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import importlib
import logging
import os
import shutil
@@ -32,17 +31,11 @@ from apache_beam.yaml.yaml_mapping import
py_value_to_js_dict
from apache_beam.yaml.yaml_provider import dicts_to_rows
from apache_beam.yaml.yaml_transform import YamlTransform
-# We use find_spec to check for pythonmonkey availability without importing it.
-# Importing pythonmonkey initializes the engine and binds it to the current
-# thread (MainThread). This causes "too much recursion" errors when the
-# Dispatcher later tries to use it from a background thread.
try:
- pm_available = importlib.util.find_spec("pythonmonkey") is not None
+ import js2py
except ImportError:
- pm_available = False
-
-if not pm_available:
- logging.warning('pythonmonkey is not installed; some tests will be skipped.')
+ js2py = None
+ logging.warning('js2py is not installed; some tests will be skipped.')
def as_rows():
@@ -70,7 +63,7 @@ class YamlUDFMappingTest(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.tmpdir)
- @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+ @unittest.skipIf(js2py is None, 'js2py not installed.')
def test_map_to_fields_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -204,7 +197,7 @@ class YamlUDFMappingTest(unittest.TestCase):
beam.Row(label='389a', timestamp=2, label_copy="389a"),
]))
- @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+ @unittest.skipIf(js2py is None, 'js2py not installed.')
def test_filter_inline_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -259,7 +252,7 @@ class YamlUDFMappingTest(unittest.TestCase):
row=beam.Row(rank=2, values=[7, 8, 9])),
]))
- @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+ @unittest.skipIf(js2py is None, 'js2py not installed.')
def test_filter_expression_js(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle', yaml_experimental_features=['javascript'
@@ -303,7 +296,7 @@ class YamlUDFMappingTest(unittest.TestCase):
row=beam.Row(rank=0, values=[1, 2, 3])),
]))
- @unittest.skipIf(not pm_available, 'pythonmonkey not installed.')
+ @unittest.skipIf(js2py is None, 'js2py not installed.')
def test_filter_inline_js_file(self):
data = '''
function f(x) {
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 313b1528ed7..75b0cda7135 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -575,6 +575,8 @@ if __name__ == '__main__':
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
'virtualenv-clone>=0.5,<1.0',
+ # https://github.com/PiotrDabkowski/Js2Py/issues/317
+ 'js2py>=0.74,<1; python_version<"3.12"',
'jsonschema>=4.0.0,<5.0.0',
] + dataframe_dependency,
# Keep the following dependencies in line with what we test against