This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d6a759a90 refactor(pyamber): flatten over-nested util packages (#4952)
2d6a759a90 is described below

commit 2d6a759a9079801a766cf98b67ba89481a5ec326
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 6 00:20:11 2026 -0700

    refactor(pyamber): flatten over-nested util packages (#4952)
    
    ### What changes were proposed in this PR?
    
    Flatten `core/util/<X>/<X>.py` single-file directories — the "one class
    per directory" pattern that was transplanted from Java/Scala — into flat
    modules under `core/util/`. Also extract the implementations that were
    living directly inside `__init__.py` (`expression_evaluator`,
    `virtual_identity`) into named modules.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4951.
    
    ### How was this PR tested?
    
    `pytest core/util` (60 passed, 1 xfailed) and `pytest core/runnables`
    (34 passed) green locally.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7 (Claude Code)
---
 amber/src/main/python/core/python_worker.py        |   4 +-
 .../main/python/core/runnables/data_processor.py   |   2 +-
 amber/src/main/python/core/runnables/heartbeat.py  |   4 +-
 .../main/python/core/runnables/network_receiver.py |   2 +-
 .../input_port_materialization_reader_runnable.py  |   2 +-
 .../main/python/core/util/{thread => }/atomic.py   |   0
 .../core/util/{protocol => }/base_protocols.py     |   0
 .../main/python/core/util/buffer/buffer_base.py    |   2 +-
 .../linked_blocking_multi_queue.py                 |   2 +-
 .../core/util/customized_queue/queue_base.py       |   2 +-
 .../__init__.py => expression_evaluator.py}        |   0
 .../src/main/python/core/util/operator/__init__.py |  16 ---
 .../python/core/util/{runnable => }/runnable.py    |   0
 .../stoppable/stoppable_queue_blocking_thread.py   |   4 +-
 amber/src/main/python/core/util/thread/__init__.py |  16 ---
 .../__init__.py => virtual_identity.py}            |   0
 amber/src/test/python/core/test_python_worker.py   | 125 +++++++++++++++++++++
 .../python/core/util/{thread => }/test_atomic.py   |   2 +-
 .../test_expression_evaluator.py                   |   0
 .../test_virtual_identity.py                       |   0
 20 files changed, 138 insertions(+), 45 deletions(-)

diff --git a/amber/src/main/python/core/python_worker.py 
b/amber/src/main/python/core/python_worker.py
index bcd0652d59..d1467de6db 100644
--- a/amber/src/main/python/core/python_worker.py
+++ b/amber/src/main/python/core/python_worker.py
@@ -20,8 +20,8 @@ from threading import Thread, Event
 
 from core.models.internal_queue import InternalQueue
 from core.runnables import MainLoop, NetworkReceiver, NetworkSender, Heartbeat
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from core.util.stoppable import Stoppable
 
 
 class PythonWorker(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 089a162228..3998a3ff9a 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -30,7 +30,7 @@ from core.models.table import all_output_to_tuple
 from core.util import Stoppable
 from core.util.console_message.replace_print import replace_print
 from core.util.console_message.timestamp import current_time_in_local_timezone
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
 from proto.org.apache.texera.amber.engine.architecture.rpc import (
     ConsoleMessage,
     ConsoleMessageType,
diff --git a/amber/src/main/python/core/runnables/heartbeat.py 
b/amber/src/main/python/core/runnables/heartbeat.py
index 9199518f3f..1e4c45837d 100644
--- a/amber/src/main/python/core/runnables/heartbeat.py
+++ b/amber/src/main/python/core/runnables/heartbeat.py
@@ -24,8 +24,8 @@ from loguru import logger
 from overrides import overrides
 from threading import Event
 
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from core.util.stoppable import Stoppable
 
 
 class Heartbeat(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index 659cd65c78..8ba4fbe147 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -43,7 +43,7 @@ from core.models.internal_queue import (
 )
 from core.proxy import ProxyServer
 from core.util import Stoppable, get_one_of
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
 from proto.org.apache.texera.amber.engine.architecture.rpc import 
EmbeddedControlMessage
 from proto.org.apache.texera.amber.engine.common import (
     PythonControlMessage,
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e49c0316cc..6122bbb8b9 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -38,7 +38,7 @@ from core.models import Tuple, InternalQueue, DataFrame, 
DataPayload
 from core.models.internal_queue import DataElement, ECMElement
 from core.storage.document_factory import DocumentFactory
 from core.util import Stoppable, get_one_of
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
 from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
 from proto.org.apache.texera.amber.core import (
     ActorVirtualIdentity,
diff --git a/amber/src/main/python/core/util/thread/atomic.py 
b/amber/src/main/python/core/util/atomic.py
similarity index 100%
rename from amber/src/main/python/core/util/thread/atomic.py
rename to amber/src/main/python/core/util/atomic.py
diff --git a/amber/src/main/python/core/util/protocol/base_protocols.py 
b/amber/src/main/python/core/util/base_protocols.py
similarity index 100%
rename from amber/src/main/python/core/util/protocol/base_protocols.py
rename to amber/src/main/python/core/util/base_protocols.py
diff --git a/amber/src/main/python/core/util/buffer/buffer_base.py 
b/amber/src/main/python/core/util/buffer/buffer_base.py
index 244b735740..10a44d28aa 100644
--- a/amber/src/main/python/core/util/buffer/buffer_base.py
+++ b/amber/src/main/python/core/util/buffer/buffer_base.py
@@ -17,7 +17,7 @@
 
 from abc import ABCMeta
 
-from core.util.protocol.base_protocols import FlushedGetable, Putable
+from core.util.base_protocols import FlushedGetable, Putable
 
 
 class IBuffer(FlushedGetable, Putable, metaclass=ABCMeta):
diff --git 
a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
 
b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
index 3b46e6db4d..735f0f6dc0 100644
--- 
a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
+++ 
b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
@@ -23,7 +23,7 @@ from typing import List, Optional, Generic, TypeVar, 
MutableMapping
 
 from core.util.customized_queue.inner import inner
 from core.util.customized_queue.queue_base import IKeyedQueue
-from core.util.thread.atomic import AtomicInteger
+from core.util.atomic import AtomicInteger
 
 K = TypeVar("K")
 T = TypeVar("T")
diff --git a/amber/src/main/python/core/util/customized_queue/queue_base.py 
b/amber/src/main/python/core/util/customized_queue/queue_base.py
index 47b8aac94e..4ee96312cb 100644
--- a/amber/src/main/python/core/util/customized_queue/queue_base.py
+++ b/amber/src/main/python/core/util/customized_queue/queue_base.py
@@ -18,7 +18,7 @@
 from abc import ABCMeta
 from dataclasses import dataclass
 
-from core.util.protocol.base_protocols import (
+from core.util.base_protocols import (
     Putable,
     Getable,
     EmtpyCheckable,
diff --git a/amber/src/main/python/core/util/expression_evaluator/__init__.py 
b/amber/src/main/python/core/util/expression_evaluator.py
similarity index 100%
rename from amber/src/main/python/core/util/expression_evaluator/__init__.py
rename to amber/src/main/python/core/util/expression_evaluator.py
diff --git a/amber/src/main/python/core/util/operator/__init__.py 
b/amber/src/main/python/core/util/operator/__init__.py
deleted file mode 100644
index 13a83393a9..0000000000
--- a/amber/src/main/python/core/util/operator/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/amber/src/main/python/core/util/runnable/runnable.py 
b/amber/src/main/python/core/util/runnable.py
similarity index 100%
rename from amber/src/main/python/core/util/runnable/runnable.py
rename to amber/src/main/python/core/util/runnable.py
diff --git 
a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py 
b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
index d20073631b..992ad596fe 100644
--- 
a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
+++ 
b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
@@ -19,8 +19,8 @@ from loguru import logger
 from overrides import overrides
 
 from core.util.customized_queue.queue_base import IQueue, QueueControl, 
QueueElement
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from .stoppable import Stoppable
 
 
 class StoppableQueueBlockingRunnable(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/util/thread/__init__.py 
b/amber/src/main/python/core/util/thread/__init__.py
deleted file mode 100644
index 13a83393a9..0000000000
--- a/amber/src/main/python/core/util/thread/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py 
b/amber/src/main/python/core/util/virtual_identity.py
similarity index 100%
rename from amber/src/main/python/core/util/virtual_identity/__init__.py
rename to amber/src/main/python/core/util/virtual_identity.py
diff --git a/amber/src/test/python/core/test_python_worker.py 
b/amber/src/test/python/core/test_python_worker.py
new file mode 100644
index 0000000000..25658fc78b
--- /dev/null
+++ b/amber/src/test/python/core/test_python_worker.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import core.python_worker as pw
+
+
+class _FakeReceiver:
+    def __init__(self, input_queue, host):
+        self.input_queue = input_queue
+        self.host = host
+        self.proxy_server = type(
+            "FakeProxyServer", (), {"get_port_number": staticmethod(lambda: 
12345)}
+        )()
+        self._shutdown_cb = None
+
+    def register_shutdown(self, cb):
+        self._shutdown_cb = cb
+
+    def run(self):
+        pass
+
+    def stop(self):
+        pass
+
+
+class _FakeSender:
+    def __init__(self, output_queue, host, port, handshake_port):
+        self.output_queue = output_queue
+        self.host = host
+        self.port = port
+        self.handshake_port = handshake_port
+        self.stopped = False
+
+    def run(self):
+        pass
+
+    def stop(self):
+        self.stopped = True
+
+
+class _FakeMainLoop:
+    def __init__(self, worker_id, input_queue, output_queue):
+        self.worker_id = worker_id
+        self.stopped = False
+
+    def run(self):
+        pass
+
+    def stop(self):
+        self.stopped = True
+
+
+class _FakeHeartbeat:
+    def __init__(self, host, port, interval, stop_event):
+        self.host = host
+        self.port = port
+        self.interval = interval
+        self.stop_event = stop_event
+        self.stopped = False
+
+    def run(self):
+        pass
+
+    def stop(self):
+        self.stopped = True
+
+
[email protected]
+def stub_network(monkeypatch):
+    monkeypatch.setattr(pw, "NetworkReceiver", _FakeReceiver)
+    monkeypatch.setattr(pw, "NetworkSender", _FakeSender)
+    monkeypatch.setattr(pw, "MainLoop", _FakeMainLoop)
+    monkeypatch.setattr(pw, "Heartbeat", _FakeHeartbeat)
+
+
+class TestPythonWorker:
+    @pytest.mark.timeout(5)
+    def test_construction_wires_dependencies(self, stub_network):
+        worker = pw.PythonWorker(worker_id="w-1", host="localhost", 
output_port=9999)
+
+        # NetworkSender must receive the handshake port from the receiver's
+        # proxy server — this is the Java→Python wiring contract.
+        assert worker._network_sender.handshake_port == 12345
+        assert worker._network_sender.port == 9999
+        # The receiver's shutdown callback is wired to worker.stop so a
+        # client-side disconnect tears the worker down.
+        assert worker._network_receiver._shutdown_cb == worker.stop
+
+    @pytest.mark.timeout(5)
+    def test_stop_cascades_to_main_loop_sender_and_heartbeat(self, 
stub_network):
+        worker = pw.PythonWorker(worker_id="w-1", host="localhost", 
output_port=9999)
+
+        worker.stop()
+
+        assert worker._main_loop.stopped is True
+        assert worker._network_sender.stopped is True
+        assert worker._heartbeat.stopped is True
+
+    @pytest.mark.timeout(5)
+    def test_run_sets_stop_event_after_main_loop_returns(self, stub_network):
+        worker = pw.PythonWorker(worker_id="w-1", host="localhost", 
output_port=9999)
+
+        # All fakes' run() return immediately, so run() drains all threads
+        # without blocking. The contract is that the heartbeat stop event
+        # is set after the main loop / sender threads join, so the
+        # heartbeat thread can exit cleanly.
+        worker.run()
+
+        assert worker._stop_event.is_set()
diff --git a/amber/src/test/python/core/util/thread/test_atomic.py 
b/amber/src/test/python/core/util/test_atomic.py
similarity index 99%
rename from amber/src/test/python/core/util/thread/test_atomic.py
rename to amber/src/test/python/core/util/test_atomic.py
index fa6238e0eb..0f6b393233 100644
--- a/amber/src/test/python/core/util/thread/test_atomic.py
+++ b/amber/src/test/python/core/util/test_atomic.py
@@ -19,7 +19,7 @@ import threading
 
 import pytest
 
-from core.util.thread.atomic import AtomicInteger
+from core.util.atomic import AtomicInteger
 
 
 class TestAtomicIntegerSingleThreaded:
diff --git 
a/amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py
 b/amber/src/test/python/core/util/test_expression_evaluator.py
similarity index 100%
rename from 
amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py
rename to amber/src/test/python/core/util/test_expression_evaluator.py
diff --git 
a/amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py 
b/amber/src/test/python/core/util/test_virtual_identity.py
similarity index 100%
rename from 
amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py
rename to amber/src/test/python/core/util/test_virtual_identity.py

Reply via email to