This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit d5776d1faa0ca97c9f020ee4e4f3f3c7a7a52576
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sun Mar 15 22:30:32 2020 +0300

    [NO ISSUE][NET] Allow Data Receivers To Report Errors
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - When an error is encountered while reading a result,
      report an error to the node sending the data to allow
      it to abort the operation.
    - Allow FullFrameChannelWriteInterface to report errors
      even when some data is still pending to be sent.
    - Add test case to ensure result senders are terminated.
    
    Change-Id: Ie7fba6760edb498b88112a7a68b1d0b9f08022b5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5323
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../test/runtime/ResultStreamingFailureTest.java   | 83 ++++++++++++++++++++++
 .../apache/hyracks/api/channels/IInputChannel.java |  5 ++
 .../hyracks/client/result/ResultSetReader.java     | 65 ++++++++++-------
 .../hyracks/comm/channels/NetworkInputChannel.java |  5 ++
 .../comm/channels/ResultNetworkInputChannel.java   |  6 ++
 .../MaterializedPartitionInputChannel.java         |  5 ++
 .../muxdemux/AbstractChannelWriteInterface.java    | 10 +--
 .../muxdemux/FullFrameChannelWriteInterface.java   | 18 ++---
 8 files changed, 156 insertions(+), 41 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
new file mode 100644
index 0000000..3a4823f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.hyracks.control.nc.result.ResultPartitionReader;
+import org.apache.hyracks.util.Span;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResultStreamingFailureTest {
+
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        integrationUtil.init(true, 
AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void resultStreamingFailureTest() throws Exception {
+        queryAndDropConnection();
+        // allow result sender to terminate and ensure no leaks
+        Span timeout = Span.start(5, TimeUnit.SECONDS);
+        while (!timeout.elapsed()) {
+            String threadDump = ThreadDumpUtil.takeDumpString();
+            if (!threadDump.contains(ResultPartitionReader.class.getName())) {
+                return;
+            }
+            TimeUnit.SECONDS.sleep(1);
+        }
+        throw new AssertionError("found leaking senders in:\n" + 
ThreadDumpUtil.takeDumpString());
+    }
+
+    private void queryAndDropConnection() throws IOException {
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            final List<NameValuePair> params = new ArrayList<>();
+            params.add(new BasicNameValuePair("statement", "select * from 
range(1, 10000000) r;"));
+            HttpPost request = new 
HttpPost("http://localhost:19004/query/service";);
+            request.setEntity(new UrlEncodedFormEntity(params, 
StandardCharsets.UTF_8));
+            CloseableHttpResponse response = httpClient.execute(request);
+            Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+            // close connection without streaming the result
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
index 4deba7b..7baf268 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
@@ -37,4 +37,9 @@ public interface IInputChannel {
     public void open(IHyracksCommonContext ctx) throws HyracksDataException;
 
     public void close() throws HyracksDataException;
+
+    /**
+     * Called when a failure is encountered while reading data
+     */
+    void fail();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index c8f5bd9..3c5a3d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -90,36 +90,43 @@ public class ResultSetReader implements IResultSetReader {
     public int read(IFrame frame) throws HyracksDataException {
         frame.reset();
         int readSize = 0;
-        if (isFirstRead() && !hasNextRecord()) {
-            return readSize;
-        }
-        // read until frame is full or all result records have been read
-        while (readSize < frame.getFrameSize()) {
-            if (currentRecordMonitor.hasMoreFrames()) {
-                final ByteBuffer readBuffer = 
currentRecordChannel.getNextBuffer();
-                if (readBuffer == null) {
-                    throw new IllegalStateException("Unexpected empty frame");
-                }
-                currentRecordMonitor.notifyFrameRead();
-                if (readSize == 0) {
-                    final int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(readBuffer);
-                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
-                    frame.getBuffer().clear();
-                }
-                frame.getBuffer().put(readBuffer);
-                currentRecordChannel.recycleBuffer(readBuffer);
-                readSize = frame.getBuffer().position();
-            } else {
-                currentRecordChannel.close();
-                if (currentRecordMonitor.failed()) {
-                    throw 
HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
-                }
-                if (isLastRecord() || !hasNextRecord()) {
-                    break;
+        try {
+            if (isFirstRead() && !hasNextRecord()) {
+                return readSize;
+            }
+            // read until frame is full or all result records have been read
+            while (readSize < frame.getFrameSize()) {
+                if (currentRecordMonitor.hasMoreFrames()) {
+                    final ByteBuffer readBuffer = 
currentRecordChannel.getNextBuffer();
+                    if (readBuffer == null) {
+                        throw new IllegalStateException("Unexpected empty 
frame");
+                    }
+                    currentRecordMonitor.notifyFrameRead();
+                    if (readSize == 0) {
+                        final int nBlocks = 
FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                        frame.getBuffer().clear();
+                    }
+                    frame.getBuffer().put(readBuffer);
+                    currentRecordChannel.recycleBuffer(readBuffer);
+                    readSize = frame.getBuffer().position();
+                } else {
+                    currentRecordChannel.close();
+                    if (currentRecordMonitor.failed()) {
+                        throw 
HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+                    }
+                    if (isLastRecord() || !hasNextRecord()) {
+                        break;
+                    }
                 }
             }
+            frame.getBuffer().flip();
+        } catch (Exception e) {
+            if (isLocalFailure()) {
+                currentRecordChannel.fail();
+            }
+            throw e;
         }
-        frame.getBuffer().flip();
         return readSize;
     }
 
@@ -201,6 +208,10 @@ public class ResultSetReader implements IResultSetReader {
         return knownRecords != null && currentRecord == knownRecords.length - 
1;
     }
 
+    private boolean isLocalFailure() {
+        return currentRecordMonitor != null && !currentRecordMonitor.failed();
+    }
+
     private static class ResultInputChannelMonitor implements 
IInputChannelMonitor {
 
         private int availableFrames;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 58664c6..53bb7cd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -119,6 +119,11 @@ public class NetworkInputChannel implements IInputChannel {
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 1df39e9..38cf7c4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -127,6 +128,11 @@ public class ResultNetworkInputChannel implements 
IInputChannel {
 
     }
 
+    @Override
+    public void fail() {
+        
ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 83677f8..2a40eb8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -99,6 +99,11 @@ public class MaterializedPartitionInputChannel implements 
IInputChannel {
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class FrameWriter implements IFrameWriter {
         @Override
         public void open() throws HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 741ca8c..ee19de3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,14 +75,14 @@ public abstract class AbstractChannelWriteInterface 
implements IChannelWriteInte
 
     @GuardedBy("ChannelControlBlock")
     private boolean computeWritability() {
-        boolean writableDataPresent = currentWriteBuffer != null || 
!wiFullQueue.isEmpty();
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            return true;
+        }
+        boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != 
null || !wiFullQueue.isEmpty());
         if (writableDataPresent) {
             return credits > 0;
         }
-        if (isPendingCloseWrite()) {
-            return true;
-        }
-        return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
+        return isPendingCloseWrite();
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index a7be3a6..e542a34 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -41,7 +41,15 @@ public class FullFrameChannelWriteInterface extends 
AbstractChannelWriteInterfac
         if (currentWriteBuffer == null) {
             currentWriteBuffer = wiFullQueue.poll();
         }
-        if (currentWriteBuffer != null) {
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            writerState.getCommand().setChannelId(channelId);
+            
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+            writerState.getCommand().setData(ecode.get());
+            writerState.reset(null, 0, null);
+            ecodeSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        } else if (currentWriteBuffer != null) {
             int size = Math.min(currentWriteBuffer.remaining(), credits);
             if (size > 0) {
                 credits -= size;
@@ -55,14 +63,6 @@ public class FullFrameChannelWriteInterface extends 
AbstractChannelWriteInterfac
             } else {
                 adjustChannelWritability();
             }
-        } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
-            writerState.getCommand().setChannelId(channelId);
-            
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
-            writerState.getCommand().setData(REMOTE_ERROR_CODE);
-            writerState.reset(null, 0, null);
-            ecodeSent = true;
-            ccb.reportLocalEOS();
-            adjustChannelWritability();
         } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);

Reply via email to