This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 8f5a42c0922d5a6701b27b763bebc8a6ef269c0e Author: Sebastian Baunsgaard <[email protected]> AuthorDate: Mon Jan 13 10:07:16 2025 +0100 [MINOR] Federated Timeout This commit reduce the timeout for federated tests, and enforce the timeout on federated requests. Previously we had some test cases that would infinitely run, and therefore we would not be able to decipher the log messages (because nothing would be written). This commit change it by enforcing a strict 16 seconds execution time for a single federated requests and a 1 day timeout for a default federated requests. Previously some operations did use the federated timeout. However, it was [...] Closes #2179 --- src/main/java/org/apache/sysds/conf/DMLConfig.java | 2 +- .../federated/FederatedStatistics.java | 3 +- .../federated/FederatedWorkerHandler.java | 3 +- .../controlprogram/federated/FederationMap.java | 41 ++++++++++++++++---- .../cp/ParamservBuiltinCPInstruction.java | 11 ++++-- ...tiReturnParameterizedBuiltinFEDInstruction.java | 44 +++++++++++++--------- src/test/config/SystemDS-MultiTenant-config.xml | 2 +- src/test/config/SystemDS-config.xml | 2 +- .../functions/codegen/RowVectorComparisonTest.java | 1 + .../primitives/part3/FederatedTokenizeTest.java | 1 + 10 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index dd4d3b2457..fd34fa4439 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -201,7 +201,7 @@ public class DMLConfig _defaultVals.put(FLOATING_POINT_PRECISION, "double" ); _defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false"); _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT, "10"); - _defaultVals.put(FEDERATED_TIMEOUT, "-1"); + _defaultVals.put(FEDERATED_TIMEOUT, "86400"); // default 1 day compute timeout. _defaultVals.put(FEDERATED_PLANNER, FederatedPlanner.RUNTIME.name()); _defaultVals.put(FEDERATED_PAR_CONN, "-1"); // vcores _defaultVals.put(FEDERATED_PAR_INST, "-1"); // vcores diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java index e95375cc75..339aabe976 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java @@ -503,7 +503,8 @@ public class FederatedStatistics { return new ArrayList<>(workerDataObjects.values()); } - public static void addEvent(EventModel event) { + public synchronized static void addEvent(EventModel event) { + // synchronized, because multiple requests can be handled concurrently workerEvents.add(event); } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java index ceaf61c225..ce21c79825 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java @@ -651,8 +651,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter { // get function and input parameters try { FederatedUDF udf = (FederatedUDF) request.getParam(0); - if(LOG.isDebugEnabled()) - LOG.debug(udf); + LOG.debug(udf); eventStage.operation = udf.getClass().getSimpleName(); diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java index 2574c4f175..a6ae6d5542 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java @@ -26,7 +26,9 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -34,6 +36,7 @@ import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.fedplanner.FTypes.AlignType; import org.apache.sysds.hops.fedplanner.FTypes.FType; import org.apache.sysds.lops.RightIndex; @@ -637,11 +640,25 @@ public class FederationMap { * @param forEachFunction function to execute for each pair */ public void forEachParallel(BiFunction<FederatedRange, FederatedData, Void> forEachFunction) { - ExecutorService pool = CommonThreadPool.get(_fedMap.size()); + ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size()); ArrayList<MappingTask> mappingTasks = new ArrayList<>(); for(Pair<FederatedRange, FederatedData> fedMap : _fedMap) mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), forEachFunction, _ID)); - CommonThreadPool.invokeAndShutdown(pool, mappingTasks); + + try { + for(Future<?> t:pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){ + if(!t.isDone()) + throw new RuntimeException("Timeout"); + else if(t.isCancelled()) + throw new RuntimeException("Failed"); + } + } + catch(InterruptedException e) { + throw new RuntimeException(e); + } + finally{ + pool.shutdown(); + } } /** @@ -655,15 +672,25 @@ public class FederationMap { * @return the new <code>FederationMap</code> */ public FederationMap mapParallel(long newVarID, BiFunction<FederatedRange, FederatedData, Void> mappingFunction) { - ExecutorService pool = CommonThreadPool.get(_fedMap.size()); - + ExecutorService pool = Executors.newFixedThreadPool(_fedMap.size()); FederationMap fedMapCopy = copyWithNewID(_ID); ArrayList<MappingTask> mappingTasks = new ArrayList<>(); for(Pair<FederatedRange, FederatedData> fedMap : fedMapCopy._fedMap) mappingTasks.add(new MappingTask(fedMap.getKey(), fedMap.getValue(), mappingFunction, newVarID)); - CommonThreadPool.invokeAndShutdown(pool, mappingTasks); - fedMapCopy._ID = newVarID; - return fedMapCopy; + try{ + for(Future<?> t : pool.invokeAll(mappingTasks, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){ + if(!t.isDone()) + throw new RuntimeException("Timeout"); + else if(t.isCancelled()){ + throw new RuntimeException("Failed"); + } + } + fedMapCopy._ID = newVarID; + return fedMapCopy; + } + catch(Exception e){ + throw new RuntimeException(e); + } } public FederationMap filter(IndexRange ixrange) { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java index 63e8fe1672..be3bf9de11 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java @@ -51,6 +51,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -61,6 +62,7 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.util.LongAccumulator; import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.ExecType; +import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.recompile.Recompiler; import org.apache.sysds.parser.Statement.FederatedPSScheme; import org.apache.sysds.parser.Statement.PSFrequency; @@ -241,13 +243,16 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc try { // Launch the worker threads and wait for completion - for (Future<Void> ret : es.invokeAll(threads)) - ret.get(); //error handling + for (Future<Void> ret : es.invokeAll(threads, ConfigurationManager.getFederatedTimeout(), TimeUnit.SECONDS)){ + if(!ret.isDone()) + throw new RuntimeException("Failed federated execution"); + // ret.get(); //error handling + } // Fetch the final model from ps ec.setVariable(output.getName(), ps.getResult()); if (DMLScript.STATISTICS) ParamServStatistics.accExecutionTime((long) ParamServStatistics.getExecutionTimer().stop()); - } catch (InterruptedException | ExecutionException e) { + } catch (Exception e) { throw new DMLRuntimeException("ParamservBuiltinCPInstruction: unknown error: ", e); } finally { es.shutdownNow(); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java index 69e0361ee7..4d7a63cf90 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.common.Types; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.hops.fedplanner.FTypes; import org.apache.sysds.hops.fedplanner.FTypes.FType; import org.apache.sysds.lops.PickByCount; @@ -47,10 +48,10 @@ import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest; import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType; import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse; import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse.ResponseType; -import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF; import org.apache.sysds.runtime.controlprogram.federated.FederationMap; import org.apache.sysds.runtime.controlprogram.federated.FederationUtils; +import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.cp.Data; @@ -175,6 +176,7 @@ public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFE try { FederatedResponse response = responseFuture.get(); MultiColumnEncoder encoder = (MultiColumnEncoder) response.getData()[0]; + // merge this encoder into a composite encoder synchronized(finalGlobalEncoder) { finalGlobalEncoder.mergeAt(encoder, columnOffset, (int) (range.getBeginDims()[0] + 1)); @@ -378,24 +380,30 @@ public class MultiReturnParameterizedBuiltinFEDInstruction extends ComputationFE @Override public FederatedResponse execute(ExecutionContext ec, Data... data) { - FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease(); - - // offset is applied on the Worker to shift the local encoders to their respective column - _encoder.applyColumnOffset(); - // apply transformation - //MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads()); - // FIXME: Enabling multithreading intermittently hangs - MatrixBlock mbout = _encoder.apply(fb, 1); - - // create output matrix object - MatrixObject mo = ExecutionContext.createMatrixObject(mbout); - - // add it to the list of variables - ec.setVariable(String.valueOf(_outputID), mo); + try{ - // return id handle - return new FederatedResponse( - ResponseType.SUCCESS_EMPTY, mbout.getNonZeros()); + FrameBlock fb = ((FrameObject) data[0]).acquireReadAndRelease(); + + // offset is applied on the Worker to shift the local encoders to their respective column + _encoder.applyColumnOffset(); + // apply transformation + MatrixBlock mbout = _encoder.apply(fb, OptimizerUtils.getTransformNumThreads()); + // FIXME: Enabling multithreading intermittently hangs + // MatrixBlock mbout = _encoder.apply(fb, 1); + + // create output matrix object + MatrixObject mo = ExecutionContext.createMatrixObject(mbout); + + // add it to the list of variables + ec.setVariable(String.valueOf(_outputID), mo); + + // return id handle + return new FederatedResponse( + ResponseType.SUCCESS_EMPTY, mbout.getNonZeros()); + } + catch(Exception e){ + return new FederatedResponse(ResponseType.ERROR); + } } @Override diff --git a/src/test/config/SystemDS-MultiTenant-config.xml b/src/test/config/SystemDS-MultiTenant-config.xml index ad2bcf0ee5..321fcc0b28 100644 --- a/src/test/config/SystemDS-MultiTenant-config.xml +++ b/src/test/config/SystemDS-MultiTenant-config.xml @@ -21,6 +21,6 @@ <!-- The timeout of the federated tests to initialize the federated matrixes --> <sysds.federated.initialization.timeout>30</sysds.federated.initialization.timeout> <!-- The timeout of each instruction sent to federated workers --> - <sysds.federated.timeout>128</sysds.federated.timeout> + <sysds.federated.timeout>16</sysds.federated.timeout> <sysds.local.spark>true</sysds.local.spark> </root> diff --git a/src/test/config/SystemDS-config.xml b/src/test/config/SystemDS-config.xml index a6f5ba525f..a899f5c71c 100644 --- a/src/test/config/SystemDS-config.xml +++ b/src/test/config/SystemDS-config.xml @@ -23,5 +23,5 @@ <!-- The timeout of the federated tests to initialize the federated matrixes --> <sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout> <!-- The timeout of each instruction sent to federated workers --> - <sysds.federated.timeout>128</sysds.federated.timeout> + <sysds.federated.timeout>16</sysds.federated.timeout> </root> diff --git a/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java b/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java index 3f287bdc07..b5d5a77474 100644 --- a/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java +++ b/src/test/java/org/apache/sysds/test/functions/codegen/RowVectorComparisonTest.java @@ -128,6 +128,7 @@ public class RowVectorComparisonTest extends AutomatedTestBase { boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; ExecMode platformOld = setExecMode(instType); + setOutputBuffering(true); try { diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java index a79a24a3b5..f8acc4623a 100644 --- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java +++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part3/FederatedTokenizeTest.java @@ -79,6 +79,7 @@ public class FederatedTokenizeTest extends AutomatedTestBase { private void runAggregateOperationTest(ExecMode execMode) { setExecMode(execMode); + setOutputBuffering(true); String TEST_NAME = TEST_NAME1;
