This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d5776d1faa0ca97c9f020ee4e4f3f3c7a7a52576 Author: Murtadha Hubail <[email protected]> AuthorDate: Sun Mar 15 22:30:32 2020 +0300 [NO ISSUE][NET] Allow Data Receivers To Report Errors - user model changes: no - storage format changes: no - interface changes: yes Details: - When an error is encountered while reading a result, report an error to the node sending the data to allow it to abort the operation. - Allow FullFrameChannelWriteInterface to report errors even when some data is still pending to be sent. - Add test case to ensure result senders are terminated. Change-Id: Ie7fba6760edb498b88112a7a68b1d0b9f08022b5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5323 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../test/runtime/ResultStreamingFailureTest.java | 83 ++++++++++++++++++++++ .../apache/hyracks/api/channels/IInputChannel.java | 5 ++ .../hyracks/client/result/ResultSetReader.java | 65 ++++++++++------- .../hyracks/comm/channels/NetworkInputChannel.java | 5 ++ .../comm/channels/ResultNetworkInputChannel.java | 6 ++ .../MaterializedPartitionInputChannel.java | 5 ++ .../muxdemux/AbstractChannelWriteInterface.java | 10 +-- .../muxdemux/FullFrameChannelWriteInterface.java | 18 ++--- 8 files changed, 156 insertions(+), 41 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java new file mode 100644 index 0000000..3a4823f --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.runtime; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.apache.hyracks.control.nc.result.ResultPartitionReader; +import org.apache.hyracks.util.Span; +import org.apache.hyracks.util.ThreadDumpUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ResultStreamingFailureTest { + + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + + @Before + public void setUp() throws Exception { + integrationUtil.init(true, AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + @Test + public void resultStreamingFailureTest() throws Exception { + queryAndDropConnection(); + // allow result sender to terminate and ensure no leaks + Span timeout = Span.start(5, TimeUnit.SECONDS); + while (!timeout.elapsed()) { + String threadDump = ThreadDumpUtil.takeDumpString(); + if (!threadDump.contains(ResultPartitionReader.class.getName())) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + throw new AssertionError("found leaking senders in:\n" + ThreadDumpUtil.takeDumpString()); + } + + private void queryAndDropConnection() throws IOException { + try (CloseableHttpClient httpClient = HttpClients.createDefault();) { + final List<NameValuePair> params = new ArrayList<>(); + params.add(new BasicNameValuePair("statement", "select * from range(1, 10000000) r;")); + HttpPost request = new HttpPost("http://localhost:19004/query/service"); + request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8)); + CloseableHttpResponse response = httpClient.execute(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + // close connection without streaming the result + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java index 4deba7b..7baf268 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java @@ -37,4 +37,9 @@ public interface IInputChannel { public void open(IHyracksCommonContext ctx) throws HyracksDataException; public void close() throws HyracksDataException; + + /** + * Called when a failure is encountered while reading data + */ + void fail(); } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java index c8f5bd9..3c5a3d1 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java @@ -90,36 +90,43 @@ public class ResultSetReader implements IResultSetReader { public int read(IFrame frame) throws HyracksDataException { frame.reset(); int readSize = 0; - if (isFirstRead() && !hasNextRecord()) { - return readSize; - } - // read until frame is full or all result records have been read - while (readSize < frame.getFrameSize()) { - if (currentRecordMonitor.hasMoreFrames()) { - final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer(); - if (readBuffer == null) { - throw new IllegalStateException("Unexpected empty frame"); - } - currentRecordMonitor.notifyFrameRead(); - if (readSize == 0) { - final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); - frame.ensureFrameSize(frame.getMinSize() * nBlocks); - frame.getBuffer().clear(); - } - frame.getBuffer().put(readBuffer); - currentRecordChannel.recycleBuffer(readBuffer); - readSize = frame.getBuffer().position(); - } else { - currentRecordChannel.close(); - if (currentRecordMonitor.failed()) { - throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId); - } - if (isLastRecord() || !hasNextRecord()) { - break; + try { + if (isFirstRead() && !hasNextRecord()) { + return readSize; + } + // read until frame is full or all result records have been read + while (readSize < frame.getFrameSize()) { + if (currentRecordMonitor.hasMoreFrames()) { + final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer(); + if (readBuffer == null) { + throw new IllegalStateException("Unexpected empty frame"); + } + currentRecordMonitor.notifyFrameRead(); + if (readSize == 0) { + final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); + frame.ensureFrameSize(frame.getMinSize() * nBlocks); + frame.getBuffer().clear(); + } + frame.getBuffer().put(readBuffer); + currentRecordChannel.recycleBuffer(readBuffer); + readSize = frame.getBuffer().position(); + } else { + currentRecordChannel.close(); + if (currentRecordMonitor.failed()) { + throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId); + } + if (isLastRecord() || !hasNextRecord()) { + break; + } } } + frame.getBuffer().flip(); + } catch (Exception e) { + if (isLocalFailure()) { + currentRecordChannel.fail(); + } + throw e; } - frame.getBuffer().flip(); return readSize; } @@ -201,6 +208,10 @@ public class ResultSetReader implements IResultSetReader { return knownRecords != null && currentRecord == knownRecords.length - 1; } + private boolean isLocalFailure() { + return currentRecordMonitor != null && !currentRecordMonitor.failed(); + } + private static class ResultInputChannelMonitor implements IInputChannelMonitor { private int availableFrames; diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java index 58664c6..53bb7cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java @@ -119,6 +119,11 @@ public class NetworkInputChannel implements IInputChannel { } + @Override + public void fail() { + // do nothing (covered by job lifecycle) + } + private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor { @Override public void accept(ByteBuffer buffer) { diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java index 1df39e9..38cf7c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.context.IHyracksCommonContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -127,6 +128,11 @@ public class ResultNetworkInputChannel implements IInputChannel { } + @Override + public void fail() { + ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); + } + private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor { @Override public void accept(ByteBuffer buffer) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java index 83677f8..2a40eb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java @@ -99,6 +99,11 @@ public class MaterializedPartitionInputChannel implements IInputChannel { } + @Override + public void fail() { + // do nothing (covered by job lifecycle) + } + private class FrameWriter implements IFrameWriter { @Override public void open() throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java index 741ca8c..ee19de3 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java @@ -75,14 +75,14 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte @GuardedBy("ChannelControlBlock") private boolean computeWritability() { - boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty(); + if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) { + return true; + } + boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != null || !wiFullQueue.isEmpty()); if (writableDataPresent) { return credits > 0; } - if (isPendingCloseWrite()) { - return true; - } - return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent; + return isPendingCloseWrite(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java index a7be3a6..e542a34 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java @@ -41,7 +41,15 @@ public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterfac if (currentWriteBuffer == null) { currentWriteBuffer = wiFullQueue.poll(); } - if (currentWriteBuffer != null) { + if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) { + writerState.getCommand().setChannelId(channelId); + writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR); + writerState.getCommand().setData(ecode.get()); + writerState.reset(null, 0, null); + ecodeSent = true; + ccb.reportLocalEOS(); + adjustChannelWritability(); + } else if (currentWriteBuffer != null) { int size = Math.min(currentWriteBuffer.remaining(), credits); if (size > 0) { credits -= size; @@ -55,14 +63,6 @@ public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterfac } else { adjustChannelWritability(); } - } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) { - writerState.getCommand().setChannelId(channelId); - writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR); - writerState.getCommand().setData(REMOTE_ERROR_CODE); - writerState.reset(null, 0, null); - ecodeSent = true; - ccb.reportLocalEOS(); - adjustChannelWritability(); } else if (isPendingCloseWrite()) { writerState.getCommand().setChannelId(channelId); writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
