This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new abacd66  [MINOR] New error object written in case failing Channel
abacd66 is described below

commit abacd66ecd69ef8f8daa83e555e26f26b33c8e03
Author: baunsgaard <[email protected]>
AuthorDate: Thu Aug 20 18:31:52 2020 +0200

    [MINOR] New error object written in case failing Channel
---
 .../federated/FederatedWorkerHandler.java             | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index ae8f2c8..b0c75df 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -293,11 +293,20 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
 
        private static class CloseListener implements ChannelFutureListener {
                @Override
-               public void operationComplete(ChannelFuture channelFuture) 
throws InterruptedException, DMLRuntimeException {
-                       if (!channelFuture.isSuccess())
-                               throw new DMLRuntimeException("Federated Worker 
Write failed");
-                       PrivacyMonitor.clearCheckedConstraints();
-                       channelFuture.channel().close().sync();
+               public void operationComplete(ChannelFuture channelFuture) 
throws InterruptedException {
+                       if (!channelFuture.isSuccess()){
+                               log.fatal("Federated Worker Write failed");
+                               channelFuture
+                                       .channel()
+                                       .writeAndFlush(
+                                               new 
FederatedResponse(ResponseType.ERROR,
+                                               new 
FederatedWorkerHandlerException("Error while sending response.")))
+                                       .channel().close().sync();
+                       }
+                       else {
+                               PrivacyMonitor.clearCheckedConstraints();
+                               channelFuture.channel().close().sync();
+                       }
                }
        }
 }

Reply via email to