derrickaw commented on code in PR #37560:
URL: https://github.com/apache/beam/pull/37560#discussion_r2793166208


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +189,162 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
+class PythonMonkeyDispatcher:
+  def __init__(self):
+    self._req_queue = queue.Queue()
+    self._resp_events = {}
+    self._resp_data = {}
+    self._lock = threading.Lock()
+    self._thread = threading.Thread(target=self._worker, daemon=True)
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._thread.start()
+        self._started = True
+        atexit.register(self.stop)
+
+  def stop(self):
+    # This method is called on process exit.
+    # We force an immediate exit to avoid a segmentation fault that often occur
+    # with pythonmonkey during standard Python interpreter finalization.
+    os._exit(0)
+
+  def _worker(self):
+    try:
+      import pythonmonkey as pm
+    except ImportError:
+      pm = None
+
+    self._pm = pm
+    self._cache = {}
+
+    while True:
+      req = self._req_queue.get()
+      if req is None:
+        break
+
+      req_id, type_str, payload = req
+      res = None
+      is_err = False
+      try:
+        if self._pm is None:
+          raise ImportError(
+              "PythonMonkey not installed or failed to import in worker 
thread."
+          )
+
+        if type_str == 'exec':
+          source, row = payload
+          if source not in self._cache:
+            self._cache[source] = self._pm.eval(f"({source})")
+          func = self._cache[source]
+          res = func(row)
+      except Exception as e:
+        res = e
+        is_err = True
+
+      with self._lock:
+        if req_id in self._resp_events:
+          self._resp_data[req_id] = (is_err, res)
+          self._resp_events[req_id].set()
+
+  def eval_and_run(self, source, row):
+    if not self._started:
+      self.start()
+
+    req_id = str(uuid.uuid4())
+    event = threading.Event()
+    with self._lock:
+      self._resp_events[req_id] = event
+
+    self._req_queue.put((req_id, 'exec', (source, row)))
+    event.wait()
+
+    with self._lock:
+      is_err, result = self._resp_data.pop(req_id)
+      del self._resp_events[req_id]
+
+    if is_err:
+      raise result
+    return result
+
+
+_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
+
+
+class JavaScriptCallable:
+  def __init__(self, source, original_fields=None, name=None):

Review Comment:
   done



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +189,162 @@ def py_value_to_js_dict(py_value):
     return py_value
 
 
-# TODO(yaml) Consider adding optional language version parameter to support
-#  ECMAScript 5 and 6
+class PythonMonkeyDispatcher:
+  def __init__(self):
+    self._req_queue = queue.Queue()
+    self._resp_events = {}
+    self._resp_data = {}
+    self._lock = threading.Lock()
+    self._thread = threading.Thread(target=self._worker, daemon=True)
+    self._started = False
+
+  def start(self):
+    with self._lock:
+      if not self._started:
+        self._thread.start()
+        self._started = True
+        atexit.register(self.stop)
+
+  def stop(self):
+    # This method is called on process exit.
+    # We force an immediate exit to avoid a segmentation fault that often occur
+    # with pythonmonkey during standard Python interpreter finalization.
+    os._exit(0)
+
+  def _worker(self):
+    try:
+      import pythonmonkey as pm
+    except ImportError:
+      pm = None
+
+    self._pm = pm
+    self._cache = {}
+
+    while True:
+      req = self._req_queue.get()
+      if req is None:
+        break
+
+      req_id, type_str, payload = req
+      res = None
+      is_err = False
+      try:
+        if self._pm is None:
+          raise ImportError(
+              "PythonMonkey not installed or failed to import in worker 
thread."
+          )
+
+        if type_str == 'exec':
+          source, row = payload
+          if source not in self._cache:
+            self._cache[source] = self._pm.eval(f"({source})")
+          func = self._cache[source]
+          res = func(row)
+      except Exception as e:
+        res = e
+        is_err = True
+
+      with self._lock:
+        if req_id in self._resp_events:
+          self._resp_data[req_id] = (is_err, res)
+          self._resp_events[req_id].set()
+
+  def eval_and_run(self, source, row):
+    if not self._started:
+      self.start()
+
+    req_id = str(uuid.uuid4())
+    event = threading.Event()
+    with self._lock:
+      self._resp_events[req_id] = event
+
+    self._req_queue.put((req_id, 'exec', (source, row)))
+    event.wait()
+
+    with self._lock:
+      is_err, result = self._resp_data.pop(req_id)
+      del self._resp_events[req_id]
+
+    if is_err:
+      raise result
+    return result
+
+
+_pythonmonkey_dispatcher = PythonMonkeyDispatcher()
+
+
+class JavaScriptCallable:
+  def __init__(self, source, original_fields=None, name=None):
+    self._source = source
+    self._name = name
+
+  def __call__(self, row):
+    # Check for pythonmonkey availability lazily (on first call)
+    if importlib.util.find_spec("pythonmonkey") is None:
+      raise RuntimeError(
+          "PythonMonkey is not installed. Please install 'apache_beam[yaml]' "
+          "to use JavaScript mapping functions.")
+
+    row_as_dict = py_value_to_js_dict(row)
+    try:
+      # If we have a name, it means we evaluated a file and need to call
+      # a specific function.
+      # Dispatcher expects a self-contained source/expression.
+      if self._name:
+        # Wrap: (function() { <source>; return <name>; })()
+        effective_source = (
+            f"(function() {{ {self._source}; return {self._name}; }})()")
+      else:
+        # Expression/Callable case: Wrap in parens to be safe
+        effective_source = f"({self._source})"
+
+      js_result = _pythonmonkey_dispatcher.eval_and_run(
+          effective_source, row_as_dict)
+
+    except Exception as exn:
+      raise RuntimeError(
+          f"Error evaluating javascript expression: {exn}") from exn
+    return dicts_to_rows(_finalize_js_result(js_result))
+
+
+def _finalize_js_result(obj):
+  """Coerce pythonmonkey objects to native Python objects (specifically
+  strings).
+  """
+  if type(obj) is str:
+    return obj
+  if isinstance(obj, str):
+    return str(obj)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to