This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 3027192d24a8b68276d17935ae366b4fb61fa5f9 Author: Matthias Boehm <[email protected]> AuthorDate: Thu Jun 29 21:34:11 2023 +0200 [SYSTEMDS-3586] Fix variable release on errors in federated workers This patch improves the robustness and error handling of federated workers for batches of federated requests. So far, if a federated request for EXEC_INST causes an exception the respective instruction did not release the inputs and thus causing incomprehensible exceptions on subsequent requests of the same batch. Even more severely, the state of variables is corrupted which can be problematic because the federated workers are stateful servers. We now do a dedicated release, but only on exceptions during instruction execution to minimize overhead. --- .../runtime/controlprogram/LocalVariableMap.java | 9 +++++++ .../federated/FederatedWorkerHandler.java | 31 +++++++++++++++------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java index 5031f3f785..c5e3a58969 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/LocalVariableMap.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.StringTokenizer; import org.apache.sysds.runtime.controlprogram.caching.CacheableData; +import org.apache.sysds.runtime.controlprogram.caching.CacheableData.CacheStatus; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.instructions.cp.Data; import org.apache.sysds.runtime.instructions.cp.ListObject; @@ -158,6 +159,14 @@ public class LocalVariableMap implements Cloneable .filter(d -> (d instanceof CacheableData)).count(); } + public void releasePinnedData() { + localMap.values().stream() + .filter(d -> (d instanceof CacheableData)) + .map(d -> (CacheableData<?>) d) + .filter(d -> d.getStatus() == CacheStatus.READ) + .forEach(d -> d.release()); + } + public String serialize() { StringBuilder sb = new StringBuilder(); int count = 0; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java index 4c2f437a81..9bfb32dbee 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java @@ -120,6 +120,14 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter { _flt = flt; _frc = frc; _fan = fan; + + if(DMLScript.LINEAGE) { + // Compiler assisted optimizations are not applicable for Fed workers. + // e.g. isMarkedForCaching fails as output operands are saved in the + // symbol table only after the instruction execution finishes. + // NOTE: In shared JVM, this will disable compiler assistance even for the coordinator + LineageCacheConfig.setCompAssRW(false); + } } public FederatedWorkerHandler(FederatedLookupTable flt, FederatedReadCache frc, FederatedWorkloadAnalyzer fan, Timing timing) { @@ -300,7 +308,8 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter { } private FederatedResponse executeCommand(FederatedRequest request, ExecutionContextMap ecm, EventStageModel eventStage) - throws DMLPrivacyException, FederatedWorkerHandlerException, Exception { + throws DMLPrivacyException, FederatedWorkerHandlerException, Exception + { final RequestType method = request.getType(); FederatedResponse result = null; @@ -616,15 +625,17 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter { final BasicProgramBlock pb = new BasicProgramBlock(null); pb.getInstructions().clear(); pb.getInstructions().add(ins); - - if(DMLScript.LINEAGE) - // Compiler assisted optimizations are not applicable for Fed workers. - // e.g. isMarkedForCaching fails as output operands are saved in the - // symbol table only after the instruction execution finishes. - // NOTE: In shared JVM, this will disable compiler assistance even for the coordinator - LineageCacheConfig.setCompAssRW(false); - - pb.execute(ec); // execute single instruction + + try { + // execute single instruction + pb.execute(ec); + } + catch(Exception ex) { + // ensure all variables are properly unpinned, even in case + // of failures because federated workers are stateful servers + ec.getVariables().releasePinnedData(); + throw ex; + } } private static void adaptToWorkload(ExecutionContext ec, FederatedWorkloadAnalyzer fan, long tid, Instruction ins){
