This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2d6a759a90 refactor(pyamber): flatten over-nested util packages (#4952)
2d6a759a90 is described below
commit 2d6a759a9079801a766cf98b67ba89481a5ec326
Author: Yicong Huang <[email protected]>
AuthorDate: Wed May 6 00:20:11 2026 -0700
refactor(pyamber): flatten over-nested util packages (#4952)
### What changes were proposed in this PR?
Flatten `core/util/<X>/<X>.py` single-file directories — the "one class
per directory" pattern that was transplanted from Java/Scala — into flat
modules under `core/util/`. Also extract the implementations that were
living directly inside `__init__.py` (`expression_evaluator`,
`virtual_identity`) into named modules.
### Any related issues, documentation, discussions?
Closes #4951.
### How was this PR tested?
`pytest core/util` (60 passed, 1 xfailed) and `pytest core/runnables`
(34 passed) green locally.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7 (Claude Code)
---
amber/src/main/python/core/python_worker.py | 4 +-
.../main/python/core/runnables/data_processor.py | 2 +-
amber/src/main/python/core/runnables/heartbeat.py | 4 +-
.../main/python/core/runnables/network_receiver.py | 2 +-
.../input_port_materialization_reader_runnable.py | 2 +-
.../main/python/core/util/{thread => }/atomic.py | 0
.../core/util/{protocol => }/base_protocols.py | 0
.../main/python/core/util/buffer/buffer_base.py | 2 +-
.../linked_blocking_multi_queue.py | 2 +-
.../core/util/customized_queue/queue_base.py | 2 +-
.../__init__.py => expression_evaluator.py} | 0
.../src/main/python/core/util/operator/__init__.py | 16 ---
.../python/core/util/{runnable => }/runnable.py | 0
.../stoppable/stoppable_queue_blocking_thread.py | 4 +-
amber/src/main/python/core/util/thread/__init__.py | 16 ---
.../__init__.py => virtual_identity.py} | 0
amber/src/test/python/core/test_python_worker.py | 125 +++++++++++++++++++++
.../python/core/util/{thread => }/test_atomic.py | 2 +-
.../test_expression_evaluator.py | 0
.../test_virtual_identity.py | 0
20 files changed, 138 insertions(+), 45 deletions(-)
diff --git a/amber/src/main/python/core/python_worker.py
b/amber/src/main/python/core/python_worker.py
index bcd0652d59..d1467de6db 100644
--- a/amber/src/main/python/core/python_worker.py
+++ b/amber/src/main/python/core/python_worker.py
@@ -20,8 +20,8 @@ from threading import Thread, Event
from core.models.internal_queue import InternalQueue
from core.runnables import MainLoop, NetworkReceiver, NetworkSender, Heartbeat
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from core.util.stoppable import Stoppable
class PythonWorker(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 089a162228..3998a3ff9a 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -30,7 +30,7 @@ from core.models.table import all_output_to_tuple
from core.util import Stoppable
from core.util.console_message.replace_print import replace_print
from core.util.console_message.timestamp import current_time_in_local_timezone
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ConsoleMessage,
ConsoleMessageType,
diff --git a/amber/src/main/python/core/runnables/heartbeat.py
b/amber/src/main/python/core/runnables/heartbeat.py
index 9199518f3f..1e4c45837d 100644
--- a/amber/src/main/python/core/runnables/heartbeat.py
+++ b/amber/src/main/python/core/runnables/heartbeat.py
@@ -24,8 +24,8 @@ from loguru import logger
from overrides import overrides
from threading import Event
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from core.util.stoppable import Stoppable
class Heartbeat(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index 659cd65c78..8ba4fbe147 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -43,7 +43,7 @@ from core.models.internal_queue import (
)
from core.proxy import ProxyServer
from core.util import Stoppable, get_one_of
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import
EmbeddedControlMessage
from proto.org.apache.texera.amber.engine.common import (
PythonControlMessage,
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index e49c0316cc..6122bbb8b9 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -38,7 +38,7 @@ from core.models import Tuple, InternalQueue, DataFrame,
DataPayload
from core.models.internal_queue import DataElement, ECMElement
from core.storage.document_factory import DocumentFactory
from core.util import Stoppable, get_one_of
-from core.util.runnable.runnable import Runnable
+from core.util.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
from proto.org.apache.texera.amber.core import (
ActorVirtualIdentity,
diff --git a/amber/src/main/python/core/util/thread/atomic.py
b/amber/src/main/python/core/util/atomic.py
similarity index 100%
rename from amber/src/main/python/core/util/thread/atomic.py
rename to amber/src/main/python/core/util/atomic.py
diff --git a/amber/src/main/python/core/util/protocol/base_protocols.py
b/amber/src/main/python/core/util/base_protocols.py
similarity index 100%
rename from amber/src/main/python/core/util/protocol/base_protocols.py
rename to amber/src/main/python/core/util/base_protocols.py
diff --git a/amber/src/main/python/core/util/buffer/buffer_base.py
b/amber/src/main/python/core/util/buffer/buffer_base.py
index 244b735740..10a44d28aa 100644
--- a/amber/src/main/python/core/util/buffer/buffer_base.py
+++ b/amber/src/main/python/core/util/buffer/buffer_base.py
@@ -17,7 +17,7 @@
from abc import ABCMeta
-from core.util.protocol.base_protocols import FlushedGetable, Putable
+from core.util.base_protocols import FlushedGetable, Putable
class IBuffer(FlushedGetable, Putable, metaclass=ABCMeta):
diff --git
a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
index 3b46e6db4d..735f0f6dc0 100644
---
a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
+++
b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py
@@ -23,7 +23,7 @@ from typing import List, Optional, Generic, TypeVar,
MutableMapping
from core.util.customized_queue.inner import inner
from core.util.customized_queue.queue_base import IKeyedQueue
-from core.util.thread.atomic import AtomicInteger
+from core.util.atomic import AtomicInteger
K = TypeVar("K")
T = TypeVar("T")
diff --git a/amber/src/main/python/core/util/customized_queue/queue_base.py
b/amber/src/main/python/core/util/customized_queue/queue_base.py
index 47b8aac94e..4ee96312cb 100644
--- a/amber/src/main/python/core/util/customized_queue/queue_base.py
+++ b/amber/src/main/python/core/util/customized_queue/queue_base.py
@@ -18,7 +18,7 @@
from abc import ABCMeta
from dataclasses import dataclass
-from core.util.protocol.base_protocols import (
+from core.util.base_protocols import (
Putable,
Getable,
EmtpyCheckable,
diff --git a/amber/src/main/python/core/util/expression_evaluator/__init__.py
b/amber/src/main/python/core/util/expression_evaluator.py
similarity index 100%
rename from amber/src/main/python/core/util/expression_evaluator/__init__.py
rename to amber/src/main/python/core/util/expression_evaluator.py
diff --git a/amber/src/main/python/core/util/operator/__init__.py
b/amber/src/main/python/core/util/operator/__init__.py
deleted file mode 100644
index 13a83393a9..0000000000
--- a/amber/src/main/python/core/util/operator/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/amber/src/main/python/core/util/runnable/runnable.py
b/amber/src/main/python/core/util/runnable.py
similarity index 100%
rename from amber/src/main/python/core/util/runnable/runnable.py
rename to amber/src/main/python/core/util/runnable.py
diff --git
a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
index d20073631b..992ad596fe 100644
---
a/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
+++
b/amber/src/main/python/core/util/stoppable/stoppable_queue_blocking_thread.py
@@ -19,8 +19,8 @@ from loguru import logger
from overrides import overrides
from core.util.customized_queue.queue_base import IQueue, QueueControl,
QueueElement
-from core.util.runnable.runnable import Runnable
-from core.util.stoppable.stoppable import Stoppable
+from core.util.runnable import Runnable
+from .stoppable import Stoppable
class StoppableQueueBlockingRunnable(Runnable, Stoppable):
diff --git a/amber/src/main/python/core/util/thread/__init__.py
b/amber/src/main/python/core/util/thread/__init__.py
deleted file mode 100644
index 13a83393a9..0000000000
--- a/amber/src/main/python/core/util/thread/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/amber/src/main/python/core/util/virtual_identity/__init__.py
b/amber/src/main/python/core/util/virtual_identity.py
similarity index 100%
rename from amber/src/main/python/core/util/virtual_identity/__init__.py
rename to amber/src/main/python/core/util/virtual_identity.py
diff --git a/amber/src/test/python/core/test_python_worker.py
b/amber/src/test/python/core/test_python_worker.py
new file mode 100644
index 0000000000..25658fc78b
--- /dev/null
+++ b/amber/src/test/python/core/test_python_worker.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import core.python_worker as pw
+
+
+class _FakeReceiver:
+ def __init__(self, input_queue, host):
+ self.input_queue = input_queue
+ self.host = host
+ self.proxy_server = type(
+ "FakeProxyServer", (), {"get_port_number": staticmethod(lambda:
12345)}
+ )()
+ self._shutdown_cb = None
+
+ def register_shutdown(self, cb):
+ self._shutdown_cb = cb
+
+ def run(self):
+ pass
+
+ def stop(self):
+ pass
+
+
+class _FakeSender:
+ def __init__(self, output_queue, host, port, handshake_port):
+ self.output_queue = output_queue
+ self.host = host
+ self.port = port
+ self.handshake_port = handshake_port
+ self.stopped = False
+
+ def run(self):
+ pass
+
+ def stop(self):
+ self.stopped = True
+
+
+class _FakeMainLoop:
+ def __init__(self, worker_id, input_queue, output_queue):
+ self.worker_id = worker_id
+ self.stopped = False
+
+ def run(self):
+ pass
+
+ def stop(self):
+ self.stopped = True
+
+
+class _FakeHeartbeat:
+ def __init__(self, host, port, interval, stop_event):
+ self.host = host
+ self.port = port
+ self.interval = interval
+ self.stop_event = stop_event
+ self.stopped = False
+
+ def run(self):
+ pass
+
+ def stop(self):
+ self.stopped = True
+
+
[email protected]
+def stub_network(monkeypatch):
+ monkeypatch.setattr(pw, "NetworkReceiver", _FakeReceiver)
+ monkeypatch.setattr(pw, "NetworkSender", _FakeSender)
+ monkeypatch.setattr(pw, "MainLoop", _FakeMainLoop)
+ monkeypatch.setattr(pw, "Heartbeat", _FakeHeartbeat)
+
+
+class TestPythonWorker:
+ @pytest.mark.timeout(5)
+ def test_construction_wires_dependencies(self, stub_network):
+ worker = pw.PythonWorker(worker_id="w-1", host="localhost",
output_port=9999)
+
+ # NetworkSender must receive the handshake port from the receiver's
+ # proxy server — this is the Java→Python wiring contract.
+ assert worker._network_sender.handshake_port == 12345
+ assert worker._network_sender.port == 9999
+ # The receiver's shutdown callback is wired to worker.stop so a
+ # client-side disconnect tears the worker down.
+ assert worker._network_receiver._shutdown_cb == worker.stop
+
+ @pytest.mark.timeout(5)
+ def test_stop_cascades_to_main_loop_sender_and_heartbeat(self,
stub_network):
+ worker = pw.PythonWorker(worker_id="w-1", host="localhost",
output_port=9999)
+
+ worker.stop()
+
+ assert worker._main_loop.stopped is True
+ assert worker._network_sender.stopped is True
+ assert worker._heartbeat.stopped is True
+
+ @pytest.mark.timeout(5)
+ def test_run_sets_stop_event_after_main_loop_returns(self, stub_network):
+ worker = pw.PythonWorker(worker_id="w-1", host="localhost",
output_port=9999)
+
+ # All fakes' run() return immediately, so run() drains all threads
+ # without blocking. The contract is that the heartbeat stop event
+ # is set after the main loop / sender threads join, so the
+ # heartbeat thread can exit cleanly.
+ worker.run()
+
+ assert worker._stop_event.is_set()
diff --git a/amber/src/test/python/core/util/thread/test_atomic.py
b/amber/src/test/python/core/util/test_atomic.py
similarity index 99%
rename from amber/src/test/python/core/util/thread/test_atomic.py
rename to amber/src/test/python/core/util/test_atomic.py
index fa6238e0eb..0f6b393233 100644
--- a/amber/src/test/python/core/util/thread/test_atomic.py
+++ b/amber/src/test/python/core/util/test_atomic.py
@@ -19,7 +19,7 @@ import threading
import pytest
-from core.util.thread.atomic import AtomicInteger
+from core.util.atomic import AtomicInteger
class TestAtomicIntegerSingleThreaded:
diff --git
a/amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py
b/amber/src/test/python/core/util/test_expression_evaluator.py
similarity index 100%
rename from
amber/src/test/python/core/util/expression_evaluator/test_expression_evaluator.py
rename to amber/src/test/python/core/util/test_expression_evaluator.py
diff --git
a/amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py
b/amber/src/test/python/core/util/test_virtual_identity.py
similarity index 100%
rename from
amber/src/test/python/core/util/virtual_identity/test_virtual_identity.py
rename to amber/src/test/python/core/util/test_virtual_identity.py