This is an automated email from the ASF dual-hosted git repository.
Baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 34a19f001f [SYSTEMDS-2651] Extend TCP port polling to federated
monitoring backend
34a19f001f is described below
commit 34a19f001f72da2d029f55c1f7789809cbf0c45c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Tue May 26 01:30:47 2026 +0200
[SYSTEMDS-2651] Extend TCP port polling to federated monitoring backend
Wire startLocalFedMonitoring through FederatedWorkerUtils.waitForWorker so
the monitoring backend's port-bind is polled instead of slept on (fixes
flaky FederatedCoordinatorIntegrationCRUDTest), migrate FederatedLogicalTest
to the bulk startLocalFedWorkers(int[]) API, and drop the now-unused
FED_WORKER_WAIT_S and FED_MONITOR_WAIT constants.
---
.../org/apache/sysds/test/AutomatedTestBase.java | 62 ++++++++++++++--------
.../primitives/part4/FederatedLogicalTest.java | 22 ++++----
2 files changed, 49 insertions(+), 35 deletions(-)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 85a37b7dbd..150a358bdf 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -20,7 +20,6 @@
package org.apache.sysds.test;
import static java.lang.Math.ceil;
-import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -118,15 +117,10 @@ public abstract class AutomatedTestBase {
public static final double GPU_TOLERANCE = 1e-9;
/**
- * Default upper bound (ms) passed to federated worker readiness waits.
The wait returns as soon
- * as the worker's TCP port accepts a connection, so this value only
affects the deadline used
- * when a worker never becomes ready. {@link FederatedWorkerUtils}
clamps caller values below its
- * enforced floor up to that floor, so the effective ceiling is at
least that floor regardless
- * of this constant.
+ * Default deadline (ms) for federated worker/monitoring readiness
waits and a few legacy
+ * {@code sleep()} calls. {@link FederatedWorkerUtils} enforces its own
minimum floor.
*/
public static final int FED_WORKER_WAIT = 3000;
- public static final int FED_MONITOR_WAIT = 10000;
- public static final int FED_WORKER_WAIT_S = 50;
// The timeout for a test to fail. all tests must execute in less than
this time.
@@ -1765,29 +1759,53 @@ public abstract class AutomatedTestBase {
}
/**
- * Start new JVM for a federated monitoring backend at the port.
+ * Start a new JVM for a federated monitoring backend at the port.
*
- * @param port Port to use for the JVM
- * @return the process associated with the worker.
+ * <p>Returns once the backend's TCP port accepts connections (Netty's
bind has completed), or
+ * throws a {@link RuntimeException} once the {@link
FederatedWorkerUtils} readiness floor
+ * elapses.
+ *
+ * @param port Port to use for the JVM
+ * @param addArgs Extra CLI args to append, or null
+ * @return the process associated with the monitoring backend.
*/
protected Process startLocalFedMonitoring(int port, String[] addArgs) {
- Process process = null;
+ return startLocalFedMonitoring(port, addArgs, FED_WORKER_WAIT);
+ }
+
+ /**
+ * Start a new JVM for a federated monitoring backend at the port.
+ *
+ * <p>Returns once the backend's TCP port accepts connections, or
throws a
+ * {@link RuntimeException} after {@code timeoutMs} elapses. The
monitoring server opens the
+ * port after Netty's {@code bind().sync()} returns; a successful TCP
connect therefore signals
+ * that the HTTP listener is ready to accept requests.
+ *
+ * @param port Port to use for the JVM
+ * @param addArgs Extra CLI args to append, or null
+ * @param timeoutMs Upper bound on the wait, in ms; raised to a minimum
value enforced inside
+ * {@link FederatedWorkerUtils}.
+ * @return the process associated with the monitoring backend.
+ */
+ protected Process startLocalFedMonitoring(int port, String[] addArgs,
int timeoutMs) {
+ Process process = spawnLocalFedMonitoring(port, addArgs);
+ FederatedWorkerUtils.waitForWorker(port, timeoutMs,
process::isAlive, "monitoring process");
+ return process;
+ }
+
+ /** Spawn a federated monitoring backend JVM and return without waiting
for the port to bind. */
+ private static Process spawnLocalFedMonitoring(int port, String[]
addArgs) {
String separator = System.getProperty("file.separator");
String classpath = System.getProperty("java.class.path");
String path = System.getProperty("java.home") + separator +
"bin" + separator + "java";
- String[] args = ArrayUtils.addAll(new String[]{path, "-cp",
classpath, DMLScript.class.getName(),
- "-fedMonitoring", Integer.toString(port)},
addArgs);
- ProcessBuilder processBuilder = new ProcessBuilder(args);
-
+ String[] args = ArrayUtils.addAll(new String[] {path, "-cp",
classpath, DMLScript.class.getName(),
+ "-fedMonitoring", Integer.toString(port)}, addArgs);
try {
- process = processBuilder.start();
- // Wait till process is started
- sleep(FED_MONITOR_WAIT);
+ return new ProcessBuilder(args).start();
}
- catch(IOException | InterruptedException e) {
- throw new RuntimeException(e);
+ catch(IOException e) {
+ throw new RuntimeException("Failed to launch federated
monitoring process on port " + port, e);
}
- return process;
}
/**
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
index ba1e7e0ea0..f8acdd0793 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
@@ -372,17 +372,15 @@ public class FederatedLogicalTest extends
AutomatedTestBase {
// empty script name because we don't execute any script, just
start the worker
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
- int port2 = (!single_fed_worker ? getRandomAvailablePort() : 0);
- int port3 = (!single_fed_worker ? getRandomAvailablePort() : 0);
- int port4 = (!single_fed_worker ? getRandomAvailablePort() : 0);
- Process thread1 = startLocalFedWorker(port1,
(!single_fed_worker ? FED_WORKER_WAIT_S : FED_WORKER_WAIT));
- Process thread2 = (!single_fed_worker ?
startLocalFedWorker(port2, FED_WORKER_WAIT_S) : null);
- Process thread3 = (!single_fed_worker ?
startLocalFedWorker(port3, FED_WORKER_WAIT_S) : null);
- Process thread4 = (!single_fed_worker ?
startLocalFedWorker(port4) : null);
-
-
+ int port2 = single_fed_worker ? 0 : getRandomAvailablePort();
+ int port3 = single_fed_worker ? 0 : getRandomAvailablePort();
+ int port4 = single_fed_worker ? 0 : getRandomAvailablePort();
+ Process[] workers = startLocalFedWorkers(single_fed_worker
+ ? new int[] {port1}
+ : new int[] {port1, port2, port3, port4});
+
try {
- if(!isAlive(thread1))
+ if(!isAlive(workers))
throw new RuntimeException("Failed starting
federated worker");
getAndLoadTestConfiguration(testname);
@@ -449,9 +447,7 @@ public class FederatedLogicalTest extends AutomatedTestBase
{
}
}
finally {
- TestUtils.shutdownThreads(thread1);
- if(!single_fed_worker)
- TestUtils.shutdownThreads(thread2, thread3,
thread4);
+ TestUtils.shutdownThreads(workers);
resetExecMode(platform_old);
}