charlesdong1991 commented on code in PR #435:
URL: https://github.com/apache/fluss-rust/pull/435#discussion_r2901654842


##########
bindings/python/test/conftest.py:
##########
@@ -112,42 +145,121 @@ def fluss_cluster():
         "netty.server.num-network-threads: 1",
         "netty.server.num-worker-threads: 3",
     ])
+
+    zookeeper = (
+        DockerContainer("zookeeper:3.9.2")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(ZOOKEEPER_NAME)
+    )
+    coordinator = (
+        DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(COORDINATOR_NAME)
+        .with_bind_ports(9123, 9123)
+        .with_bind_ports(9223, 9223)
+        .with_command("coordinatorServer")
+        .with_env("FLUSS_PROPERTIES", coordinator_props)
+    )
     tablet_server = (
         DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
-        .with_network(network)
-        .with_name("tablet-server-python-test")
+        .with_kwargs(network=NETWORK_NAME)
+        .with_name(TABLET_SERVER_NAME)
         .with_bind_ports(9123, 9124)
         .with_bind_ports(9223, 9224)
         .with_command("tabletServer")
         .with_env("FLUSS_PROPERTIES", tablet_props)
     )
 
-    zookeeper.start()
-    coordinator.start()
-    tablet_server.start()
+    try:
+        zookeeper.start()
+        coordinator.start()
+        tablet_server.start()
+    except Exception as e:

Review Comment:
   this is a bit broad here, maybe consider narrowing to docker specific errors?
   
   btw IIUC, we want to start fluss clsuter exactly once per test session 
   according to pytest-xdist: 
https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once
 , they recommend to use file lock to coordinate startup



##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +50,77 @@
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
     start = time.time()
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a command (list form), return exit code."""
+    return subprocess.run(cmd, capture_output=True).returncode

Review Comment:
   if run fails, we will only see error code, but not the error message now, 
not sure if it will hinder debugging if needed?



##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +50,77 @@
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
     start = time.time()
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a command (list form), return exit code."""
+    return subprocess.run(cmd, capture_output=True).returncode
+
+
+def _start_cluster():
+    """Start the Fluss Docker cluster via testcontainers.
+
+    If another worker already started the cluster (detected via port check),
+    reuse it. If container creation fails (name conflict from a racing worker),
+    wait for the other worker's cluster to become ready.
+    """
+    # Reuse cluster started by another parallel worker or previous run.
+    if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):

Review Comment:
   i am curious if 1s too short?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to