This is an automated email from the ASF dual-hosted git repository.

Baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 88c26e2ae7 [SYSTEMDS-2651] Poll for async compression in federated 
component tests (#2472)
88c26e2ae7 is described below

commit 88c26e2ae79c70d5b30bc881c03c368255d8a470
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Tue May 26 16:46:13 2026 +0200

    [SYSTEMDS-2651] Poll for async compression in federated component tests 
(#2472)
    
    FedWorkerReadMatrixCompress.verifyRead failed roughly once per ten
    component-test CI runs because it called FederatedTestUtils.wait(1000)
    to give the worker time to finish its async compression (kicked off by
    CompressedMatrixBlockFactory.compressAsync), then asserted that the
    returned block was a CompressedMatrixBlock. On a contended runner the
    1 s sleep was not enough, the subsequent read returned the still-
    uncompressed block, and the assertion failed. Surefire's
    rerunFailingTestsCount=2 hid this as a "Flake" rather than a job
    failure.
    
    Add FedWorkerBase.awaitCompressed(long id), which polls getMatrixBlock
    at 25 ms intervals for up to COMPRESS_TIMEOUT_MS (10 s) and returns as
    soon as the worker reports the compressed form, or returns the last-
    observed block on timeout so the caller's assertion still produces a
    meaningful failure.
    
    Convert the three call sites that used the fixed-sleep anti-pattern:
    - FedWorkerReadMatrixCompress.verifyRead (the actual CI flake)
    - FedWorkerMatrixCompress.verifySameOrAlsoCompressedAsLocalCompress
      (polls only when local compresses, so the "do not compress"
      parametrization stays fast)
    - FedWorkerMatrixMultiplyWorkload.verifySameOrAlsoCompressedAsLocalCompress
    
    Remove the now-unused FederatedTestUtils.wait helper so the
    anti-pattern is harder to reintroduce.
---
 .../test/component/federated/FedWorkerBase.java    | 39 ++++++++++++++++++++++
 .../federated/FedWorkerMatrixCompress.java         | 12 ++++---
 .../federated/FedWorkerMatrixMultiplyWorkload.java | 13 +++-----
 .../federated/FedWorkerReadMatrixCompress.java     |  7 ++--
 .../component/federated/FederatedTestUtils.java    |  9 -----
 5 files changed, 54 insertions(+), 26 deletions(-)

diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
index 1bf5d33006..2c854b4a81 100644
--- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
+++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
@@ -26,12 +26,19 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.test.AutomatedTestBase;
 
 public abstract class FedWorkerBase {
        protected static final Log LOG = 
LogFactory.getLog(FedWorkerBase.class.getName());
 
+       /** Upper bound (ms) for {@link #awaitCompressed(long)} polling against 
async worker-side compression. */
+       protected static final int COMPRESS_TIMEOUT_MS = 10_000;
+
+       /** Poll interval used by {@link #awaitCompressed(long)} between 
successive reads. */
+       private static final int COMPRESS_POLL_INTERVAL_MS = 25;
+
        private final InetSocketAddress addr;
        public final int port;
 
@@ -70,6 +77,38 @@ public abstract class FedWorkerBase {
                return FederatedTestUtils.getMatrixBlock(id, addr);
        }
 
+       /**
+        * Poll the federated worker until the matrix at {@code id} is observed 
as a
+        * {@link CompressedMatrixBlock}, or {@link #COMPRESS_TIMEOUT_MS} 
elapses.
+        *
+        * <p>Federated workers compress asynchronously after a PUT/READ_VAR 
(see
+        * {@code CompressedMatrixBlockFactory.compressAsync}), so a {@code 
getMatrixBlock} fired right
+        * after the operation can race against the in-flight compression and 
return the uncompressed
+        * block. Tests that need to observe the compressed form should poll 
instead of sleeping a fixed
+        * amount.
+        *
+        * <p>On timeout this returns the most recent (uncompressed) read so 
the caller can produce a
+        * meaningful assertion failure naming the variable.
+        *
+        * @param id federated variable id
+        * @return the matrix block, compressed if compression finished in 
time, otherwise the latest read
+        */
+       public MatrixBlock awaitCompressed(long id) {
+               final long deadline = System.currentTimeMillis() + 
COMPRESS_TIMEOUT_MS;
+               MatrixBlock mb = getMatrixBlock(id);
+               while(!(mb instanceof CompressedMatrixBlock) && 
System.currentTimeMillis() < deadline) {
+                       try {
+                               Thread.sleep(COMPRESS_POLL_INTERVAL_MS);
+                       }
+                       catch(InterruptedException ie) {
+                               Thread.currentThread().interrupt();
+                               fail("Interrupted while waiting for federated 
compression of id=" + id);
+                       }
+                       mb = getMatrixBlock(id);
+               }
+               return mb;
+       }
+
        public long matrixMult(long idLeft, long idRight) {
                return FederatedTestUtils.exec_MM(idLeft, idRight, addr);
        }
diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
index 29c6f94e7a..2b5ff327ef 100644
--- 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
+++ 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
@@ -65,14 +65,16 @@ public class FedWorkerMatrixCompress extends FedWorkerBase {
                // local
                final MatrixBlock mbcLocal = 
CompressedMatrixBlockFactory.compress(mb).getLeft();
 
-               // federated
+               // federated. Compression on the worker is async; poll only 
when we expect compression to
+               // match the local result, otherwise a single read is enough.
                final long id = putMatrixBlock(mb);
-               // give the federated site time to compress async.
-               FederatedTestUtils.wait(1000);
-               final MatrixBlock mbr = getMatrixBlock(id);
+               final MatrixBlock mbr = (mbcLocal instanceof 
CompressedMatrixBlock)
+                       ? awaitCompressed(id)
+                       : getMatrixBlock(id);
 
                if(mbcLocal instanceof CompressedMatrixBlock && !(mbr 
instanceof CompressedMatrixBlock))
-                       fail("Invalid result, the federated site did not 
compress the matrix block");
+                       fail("Invalid result, the federated site did not 
compress the matrix block within "
+                               + COMPRESS_TIMEOUT_MS + "ms");
 
                TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr, 0, 0,
                        "Not equivalent matrix block returned from federated 
site");
diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
index 06a193368c..59c9a093c4 100644
--- 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
+++ 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
@@ -88,19 +88,16 @@ public class FedWorkerMatrixMultiplyWorkload extends 
FedWorkerBase {
                for(int i = 0; i < 9; i++) // chain left side compressed 
multiplications with idr.
                        ide = matrixMult(ide, idr);
 
-               // give the federated site time to compress async (it should 
already be done, but just to be safe).
-               FederatedTestUtils.wait(1000);
-
-               // Get back the matrix block stored behind mbr that should be 
compressed now.
-               final MatrixBlock mbr_compressed = getMatrixBlock(idr);
+               // Workload-driven compression runs async on the worker; poll 
instead of sleeping a fixed
+               // amount so a slow runner doesn't observe the 
still-uncompressed block.
+               final MatrixBlock mbr_compressed = awaitCompressed(idr);
 
                if(!(mbr_compressed instanceof CompressedMatrixBlock))
-                       fail("Invalid result, the federated site did not 
compress the matrix block based on workload");
+                       fail("Invalid result, the federated site did not 
compress the matrix block based on workload within "
+                               + COMPRESS_TIMEOUT_MS + "ms");
 
                TestUtils.compareMatricesBitAvgDistance(mbcLocal, 
mbr_compressed, 0, 0,
                        "Not equivalent matrix block returned from federated 
site");
        }
 
-
-
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
index ed47a87e1e..d94cd367a1 100644
--- 
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
+++ 
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
@@ -65,15 +65,14 @@ public class FedWorkerReadMatrixCompress extends 
FedWorkerBase {
        public void verifyRead() {
                MatrixBlock expected = readCSV();
                Long id = readMatrix(path);
-               // give the federated site time to compress async.
-               FederatedTestUtils.wait(1000);
-               MatrixBlock actual = getMatrixBlock(id);
+               // Compression happens async on the worker; poll instead of 
sleeping a fixed amount.
+               MatrixBlock actual = awaitCompressed(id);
                if(actual instanceof CompressedMatrixBlock){
                        TestUtils.compareMatricesBitAvgDistance(expected, 
actual, 0, 0,
                                "Not equivalent matrix block read from 
federated site");
                }
                else
-                       fail("Did not compress the matrix input");
+                       fail("Did not compress the matrix input within " + 
COMPRESS_TIMEOUT_MS + "ms");
        }
 
        protected MatrixBlock readCSV() {
diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
 
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
index 4d3796892a..9b589c35f7 100644
--- 
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
+++ 
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
@@ -190,13 +190,4 @@ public class FederatedTestUtils {
                        fail("Failed to get response from put Matrix Block");
                }
        }
-
-       protected static void wait(int ms) {
-               try {
-                       Thread.sleep(ms);
-               }
-               catch(Exception e) {
-                       fail("Failed to wait");
-               }
-       }
 }

Reply via email to