This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 8f5a42c0922d5a6701b27b763bebc8a6ef269c0e
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Mon Jan 13 10:07:16 2025 +0100

    [MINOR] Federated Timeout
    
    This commit reduce the timeout for federated tests, and enforce the timeout 
on federated requests. Previously we had some test cases that would infinitely 
run, and therefore we would not be able to decipher the log messages (because 
nothing would be written). This commit change it by enforcing a strict 16 
seconds execution time for a single federated requests and a 1 day timeout for 
a default federated requests. Previously some operations did use the federated 
timeout. However, it was [...]
    
    Closes #2179
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |  2 +-
 .../federated/FederatedStatistics.java             |  3 +-
 .../federated/FederatedWorkerHandler.java          |  3 +-
 .../controlprogram/federated/FederationMap.java    | 41 ++++++++++++++++----
 .../cp/ParamservBuiltinCPInstruction.java          | 11 ++++--
 ...tiReturnParameterizedBuiltinFEDInstruction.java | 44 +++++++++++++---------
 src/test/config/SystemDS-MultiTenant-config.xml    |  2 +-
 src/test/config/SystemDS-config.xml                |  2 +-
 .../functions/codegen/RowVectorComparisonTest.java |  1 +
 .../primitives/part3/FederatedTokenizeTest.java    |  1 +
 10 files changed, 76 insertions(+), 34 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index dd4d3b2457..fd34fa4439 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -201,7 +201,7 @@ public class DMLConfig
                _defaultVals.put(FLOATING_POINT_PRECISION, "double" );
                _defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
                _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, 
"10");
-               _defaultVals.put(FEDERATED_TIMEOUT,      "-1");
+               _defaultVals.put(FEDERATED_TIMEOUT,      "86400"); // default 1 
day compute timeout.
                _defaultVals.put(FEDERATED_PLANNER,      
FederatedPlanner.RUNTIME.name());
                _defaultVals.put(FEDERATED_PAR_CONN,     "-1"); // vcores
                _defaultVals.put(FEDERATED_PAR_INST,     "-1"); // vcores
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index e95375cc75..339aabe976 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -503,7 +503,8 @@ public class FederatedStatistics {
                return new ArrayList<>(workerDataObjects.values());
        }
 
-       public static void addEvent(EventModel event) {
+       public synchronized static void addEvent(EventModel event) { 
+               // synchronized, because multiple requests can be handled 
concurrently
                workerEvents.add(event);
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index ceaf61c225..ce21c79825 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -651,8 +651,7 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                // get function and input parameters
                try {
                        FederatedUDF udf = (FederatedUDF) request.getParam(0);
-                       if(LOG.isDebugEnabled())
-                               LOG.debug(udf);
+                       LOG.debug(udf);
 
                        eventStage.operation = udf.getClass().getSimpleName();
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index 2574c4f175..a6ae6d5542 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -34,6 +36,7 @@ import java.util.stream.Stream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.fedplanner.FTypes.AlignType;
 import org.apache.sysds.hops.fedplanner.FTypes.FType;
 import org.apache.sysds.lops.RightIndex;
@@ -637,11 +640,25 @@ public class FederationMap {
         * @param forEachFunction function to execute for each pair
         */
        public void forEachParallel(BiFunction<FederatedRange, FederatedData, 
Void> forEachFunction) {
-               ExecutorService pool = CommonThreadPool.get(_fedMap.size());
+               ExecutorService pool = 
Executors.newFixedThreadPool(_fedMap.size());
                ArrayList<MappingTask> mappingTasks = new ArrayList<>();
                for(Pair<FederatedRange, FederatedData> fedMap : _fedMap)
                        mappingTasks.add(new MappingTask(fedMap.getKey(), 
fedMap.getValue(), forEachFunction, _ID));
-               CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
+
+               try {
+                       for(Future<?> t:pool.invokeAll(mappingTasks, 
ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
+                               if(!t.isDone())
+                                       throw new RuntimeException("Timeout");
+                               else if(t.isCancelled())
+                                       throw new RuntimeException("Failed");
+                       }
+               }
+               catch(InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+               finally{
+                       pool.shutdown();
+               }
        }
 
        /**
@@ -655,15 +672,25 @@ public class FederationMap {
         * @return the new <code>FederationMap</code>
         */
        public FederationMap mapParallel(long newVarID, 
BiFunction<FederatedRange, FederatedData, Void> mappingFunction) {
-               ExecutorService pool = CommonThreadPool.get(_fedMap.size());
-
+               ExecutorService pool = 
Executors.newFixedThreadPool(_fedMap.size());
                FederationMap fedMapCopy = copyWithNewID(_ID);
                ArrayList<MappingTask> mappingTasks = new ArrayList<>();
                for(Pair<FederatedRange, FederatedData> fedMap : 
fedMapCopy._fedMap)
                        mappingTasks.add(new MappingTask(fedMap.getKey(), 
fedMap.getValue(), mappingFunction, newVarID));
-               CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
-               fedMapCopy._ID = newVarID;
-               return fedMapCopy;
+               try{
+                       for(Future<?> t : pool.invokeAll(mappingTasks, 
ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
+                               if(!t.isDone())
+                                       throw new RuntimeException("Timeout");
+                               else if(t.isCancelled()){
+                                       throw new RuntimeException("Failed");
+                               }
+                       }
+                       fedMapCopy._ID = newVarID;
+                       return fedMapCopy;
+               }
+               catch(Exception e){
+                       throw new RuntimeException(e);
+               }
        }
 
        public FederationMap filter(IndexRange ixrange) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 63e8fe1672..be3bf9de11 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -61,6 +62,7 @@ import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.recompile.Recompiler;
 import org.apache.sysds.parser.Statement.FederatedPSScheme;
 import org.apache.sysds.parser.Statement.PSFrequency;
@@ -241,13 +243,16 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
 
                try {
                        // Launch the worker threads and wait for completion
-                       for (Future<Void> ret : es.invokeAll(threads))
-                               ret.get(); //error handling
+                       for (Future<Void> ret : es.invokeAll(threads, 
ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){
+                               if(!ret.isDone())
+                                       throw new RuntimeException("Failed 
federated execution");
+                               // ret.get(); //error handling
+                       }
                        // Fetch the final model from ps
                        ec.setVariable(output.getName(), ps.getResult());
                        if (DMLScript.STATISTICS)
                                ParamServStatistics.accExecutionTime((long) 
ParamServStatistics.getExecutionTimer().stop());
-               } catch (InterruptedException | ExecutionException e) {
+               } catch (Exception e) {
                        throw new 
DMLRuntimeException("ParamservBuiltinCPInstruction: unknown error: ", e);
                } finally {
                        es.shutdownNow();
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
index 69e0361ee7..4d7a63cf90 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.hops.fedplanner.FTypes;
 import org.apache.sysds.hops.fedplanner.FTypes.FType;
 import org.apache.sysds.lops.PickByCount;
@@ -47,10 +48,10 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
 import 
org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType;
-import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.cp.Data;
@@ -175,6 +176,7 @@ public class MultiReturnParameterizedBuiltinFEDInstruction 
extends ComputationFE
                                try {
                                        FederatedResponse response = 
responseFuture.get();
                                        MultiColumnEncoder encoder = 
(MultiColumnEncoder) response.getData()[0];
+
                                        // merge this encoder into a composite 
encoder
                                        synchronized(finalGlobalEncoder) {
                                                
finalGlobalEncoder.mergeAt(encoder, columnOffset, (int) 
(range.getBeginDims()[0] + 1));
@@ -378,24 +380,30 @@ public class 
MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFE
 
                @Override
                public FederatedResponse execute(ExecutionContext ec, Data... 
data) {
-                       FrameBlock fb = ((FrameObject) 
data[0]).acquireReadAndRelease();
-
-                       // offset is applied on the Worker to shift the local 
encoders to their respective column
-                       _encoder.applyColumnOffset();
-                       // apply transformation
-                       //MatrixBlock mbout = _encoder.apply(fb, 
OptimizerUtils.getTransformNumThreads());
-                       // FIXME: Enabling multithreading intermittently hangs
-                       MatrixBlock mbout = _encoder.apply(fb, 1);
-
-                       // create output matrix object
-                       MatrixObject mo = 
ExecutionContext.createMatrixObject(mbout);
-
-                       // add it to the list of variables
-                       ec.setVariable(String.valueOf(_outputID), mo);
+                       try{
 
-                       // return id handle
-                       return new FederatedResponse(
-                               ResponseType.SUCCESS_EMPTY, 
mbout.getNonZeros());
+                               FrameBlock fb = ((FrameObject) 
data[0]).acquireReadAndRelease();
+       
+                               // offset is applied on the Worker to shift the 
local encoders to their respective column
+                               _encoder.applyColumnOffset();
+                               // apply transformation
+                               MatrixBlock mbout = _encoder.apply(fb, 
OptimizerUtils.getTransformNumThreads());
+                               // FIXME: Enabling multithreading 
intermittently hangs
+                               // MatrixBlock mbout = _encoder.apply(fb, 1);
+       
+                               // create output matrix object
+                               MatrixObject mo = 
ExecutionContext.createMatrixObject(mbout);
+       
+                               // add it to the list of variables
+                               ec.setVariable(String.valueOf(_outputID), mo);
+       
+                               // return id handle
+                               return new FederatedResponse(
+                                       ResponseType.SUCCESS_EMPTY, 
mbout.getNonZeros());
+                       }
+                       catch(Exception e){
+                               return new 
FederatedResponse(ResponseType.ERROR);
+                       }
                }
 
                @Override
diff --git a/src/test/config/SystemDS-MultiTenant-config.xml 
b/src/test/config/SystemDS-MultiTenant-config.xml
index ad2bcf0ee5..321fcc0b28 100644
--- a/src/test/config/SystemDS-MultiTenant-config.xml
+++ b/src/test/config/SystemDS-MultiTenant-config.xml
@@ -21,6 +21,6 @@
    <!-- The timeout of the federated tests to initialize the federated 
matrixes -->
    
<sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout>
    <!-- The timeout of each instruction sent to federated workers -->
-   <sysds.federated.timeout>128</sysds.federated.timeout>
+   <sysds.federated.timeout>16</sysds.federated.timeout>
    <sysds.local.spark>true</sysds.local.spark>
 </root>
diff --git a/src/test/config/SystemDS-config.xml 
b/src/test/config/SystemDS-config.xml
index a6f5ba525f..a899f5c71c 100644
--- a/src/test/config/SystemDS-config.xml
+++ b/src/test/config/SystemDS-config.xml
@@ -23,5 +23,5 @@
    <!-- The timeout of the federated tests to initialize the federated 
matrixes -->
    
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
    <!-- The timeout of each instruction sent to federated workers -->
-   <sysds.federated.timeout>128</sysds.federated.timeout>
+   <sysds.federated.timeout>16</sysds.federated.timeout>
 </root>
diff --git 
a/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java
 
b/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java
index 3f287bdc07..b5d5a77474 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java
@@ -128,6 +128,7 @@ public class RowVectorComparisonTest extends 
AutomatedTestBase
        {       
                boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                ExecMode platformOld = setExecMode(instType);
+               setOutputBuffering(true);
                
                try
                {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java
index a79a24a3b5..f8acc4623a 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java
@@ -79,6 +79,7 @@ public class FederatedTokenizeTest extends AutomatedTestBase {
 
        private void runAggregateOperationTest(ExecMode execMode) {
                setExecMode(execMode);
+               setOutputBuffering(true);
 
                String TEST_NAME = TEST_NAME1;
 

Reply via email to