Repository: asterixdb Updated Branches: refs/heads/master d90004695 -> 86cbec537
Extensible exception handling in QueryServiceServlet Change-Id: If8037a97f3d0b0febb8caf68e099f1fd24e0ac49 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1836 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/86cbec53 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/86cbec53 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/86cbec53 Branch: refs/heads/master Commit: 86cbec5370e32fea0a877bd64afabebd06dbb355 Parents: d900046 Author: Dmitry Lychagin <[email protected]> Authored: Mon Jun 19 11:16:14 2017 -0700 Committer: Yingyi Bu <[email protected]> Committed: Mon Jun 19 15:43:13 2017 -0700 ---------------------------------------------------------------------- asterixdb/asterix-app/pom.xml | 4 +++ .../common/AsterixHyracksIntegrationUtil.java | 11 ++++-- .../api/http/server/NCQueryServiceServlet.java | 15 ++++++++ .../api/http/server/QueryServiceServlet.java | 29 +++++++++------- .../http/servlet/ConnectorApiServletTest.java | 21 +++++++++--- .../test/common/CancellationTestExecutor.java | 8 +++-- .../asterix/test/common/TestExecutor.java | 25 +++++++++++--- asterixdb/pom.xml | 5 +++ .../hyracks/http/server/HttpServerHandler.java | 2 +- hyracks-fullstack/hyracks/hyracks-ipc/pom.xml | 4 +++ .../hyracks/ipc/impl/IPCConnectionManager.java | 36 +++++++++++++++++--- .../org/apache/hyracks/ipc/impl/IPCSystem.java | 3 +- 12 files changed, 128 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 01afdcf..beac085 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -433,6 +433,10 @@ <artifactId>hyracks-net</artifactId> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + </dependency> + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index ba98f73..dea5259 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -202,9 +202,7 @@ public class AsterixHyracksIntegrationUtil { stopNcTheard.join(); } - if (cc != null) { - cc.stop(); - } + stopCC(false); if (deleteOldInstanceData) { deleteTransactionLogs(); @@ -212,6 +210,13 @@ public class AsterixHyracksIntegrationUtil { } } + public void stopCC(boolean terminateNCService) throws Exception { + if (cc != null) { + cc.stop(terminateNCService); + cc = null; + } + } + protected String getDefaultStoragePath() { return joinPath("target", "io", "dir"); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 2b70685..9547514 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -20,12 +20,16 @@ package org.apache.asterix.api.http.server; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.app.message.ExecuteStatementRequestMessage; import org.apache.asterix.app.message.ExecuteStatementResponseMessage; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; @@ -35,6 +39,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.ipc.exceptions.IPCException; /** * Query service servlet that can run on NC nodes. @@ -91,4 +96,14 @@ public class NCQueryServiceServlet extends QueryServiceServlet { sessionOutput.out().append(responseMsg.getResult()); } } + + @Override + protected HttpResponseStatus handleExecuteStatementException(Throwable t) { + if (t instanceof IPCException || t instanceof TimeoutException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t); + return HttpResponseStatus.SERVICE_UNAVAILABLE; + } else { + return super.handleExecuteStatementException(t); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index ec90ae9..9ee064e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -420,21 +420,10 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS); } errorCount = 0; - } catch (AlgebricksException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.getMessage(), pe); - ResultUtil.printError(resultWriter, pe); - ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.BAD_REQUEST; - } catch (HyracksException pe) { - GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); - ResultUtil.printError(resultWriter, pe); - ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.INTERNAL_SERVER_ERROR; - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); + } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) { + status = handleExecuteStatementException(e); ResultUtil.printError(resultWriter, e); ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); - status = HttpResponseStatus.INTERNAL_SERVER_ERROR; } finally { if (execStartEnd[0] == -1) { execStartEnd[1] = -1; @@ -475,4 +464,18 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { param.clientContextID, queryCtx); outExecStartEnd[1] = System.nanoTime(); } + + protected HttpResponseStatus handleExecuteStatementException(Throwable t) { + if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t instanceof TokenMgrError + || t instanceof AlgebricksException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t); + return HttpResponseStatus.BAD_REQUEST; + } else if (t instanceof HyracksException) { + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t); + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } else { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", t); + return HttpResponseStatus.INTERNAL_SERVER_ERROR; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java index 8b1bbe0..3d6543b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -48,7 +48,11 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,10 +65,20 @@ import junit.extensions.PA; public class ConnectorApiServletTest { - @Test - public void testGet() throws Exception { + @BeforeClass + public static void setup() throws Exception { // Starts test asterixdb cluster. SqlppExecutionTest.setUp(); + } + + @AfterClass + public static void teardown() throws Exception { + // Tears down the asterixdb cluster. + SqlppExecutionTest.tearDown(); + } + + @Test + public void testGet() throws Exception { // Configures a test connector api servlet. ConnectorApiServlet let = new ConnectorApiServlet(new ConcurrentHashMap<>(), new String[] { "/" }, (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext()); @@ -113,9 +127,6 @@ public class ConnectorApiServletTest { ArrayNode splits = (ArrayNode) actualResponse.get("splits"); String path = (splits.get(0)).get("path").asText(); Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset")); - - // Tears down the asterixdb cluster. - SqlppExecutionTest.tearDown(); } @Test http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index 1744836..50c2986 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -29,6 +29,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.function.Predicate; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest; @@ -46,14 +47,15 @@ public class CancellationTestExecutor extends TestExecutor { @Override public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri, - List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) - throws Exception { + List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded, + Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { String clientContextId = UUID.randomUUID().toString(); final List<TestCase.CompilationUnit.Parameter> newParams = cancellable ? upsertParam(params, "client_context_id", clientContextId) : params; Callable<InputStream> query = () -> { try { - return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, true); + return CancellationTestExecutor.super.executeQueryService(str, fmt, uri, newParams, jsonEncoded, + responseCodeValidator, true); } catch (Exception e) { e.printStackTrace(); throw e; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 35ab1df..ed6a77a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -518,17 +518,27 @@ public class TestExecutor { public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception { - return executeQueryService(str, fmt, uri, params, jsonEncoded, false); + return executeQueryService(str, fmt, uri, params, jsonEncoded, null, false); + } + + public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, + List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator) + throws Exception { + return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false); } protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, - List<CompilationUnit.Parameter> params, boolean jsonEncoded, boolean cancellable) throws Exception { + List<CompilationUnit.Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, + boolean cancellable) throws Exception { final List<CompilationUnit.Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams) : constructPostMethodUrl(str, uri, "statement", newParams); // Set accepted output response type method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType()); HttpResponse response = executeHttpRequest(method); + if (responseCodeValidator != null) { + checkResponse(response, responseCodeValidator); + } return response.getEntity().getContent(); } @@ -637,8 +647,13 @@ public class TestExecutor { } public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception { + return executeJSONGet(fmt, uri, code -> code == HttpStatus.SC_OK); + } + + public InputStream executeJSONGet(OutputFormat fmt, URI uri, Predicate<Integer> responseCodeValidator) + throws Exception { HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>()); - HttpResponse response = executeAndCheckHttpRequest(request); + HttpResponse response = executeAndCheckHttpRequest(request, responseCodeValidator); return response.getEntity().getContent(); } @@ -1101,7 +1116,7 @@ public class TestExecutor { } final URI uri = getEndpoint(Servlets.QUERY_SERVICE); if (DELIVERY_IMMEDIATE.equals(delivery)) { - resultStream = executeQueryService(statement, fmt, uri, params, true, true); + resultStream = executeQueryService(statement, fmt, uri, params, true, null, true); resultStream = ResultExtractor.extract(resultStream); } else { String handleVar = getHandleVariable(statement); @@ -1355,7 +1370,7 @@ public class TestExecutor { return uri; } - protected URI getEndpoint(String servlet) throws URISyntaxException { + public URI getEndpoint(String servlet) throws URISyntaxException { return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/asterixdb/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index e49f9aa..dd86bea 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -826,6 +826,11 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${hyracks.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> <artifactId>algebricks-compiler</artifactId> <version>${algebricks.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 00b3cb6..e89ed56 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -78,7 +78,7 @@ public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboun protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) { DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status); - ctx.write(response).addListener(ChannelFutureListener.CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private void submit(ChannelHandlerContext ctx, IServlet servlet, FullHttpRequest request) throws IOException { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml index 45c5e04..cc3a513 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml @@ -41,6 +41,10 @@ </properties> <dependencies> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 9efd70e..d1659a8 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.Channel; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; @@ -32,13 +33,17 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.commons.io.IOUtils; + public class IPCConnectionManager { private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName()); @@ -88,9 +93,10 @@ public class IPCConnectionManager { networkThread.start(); } - void stop() throws IOException { + void stop() { stopped = true; - serverSocketChannel.close(); + IOUtils.closeQuietly(serverSocketChannel); + networkThread.selector.wakeup(); } IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException { @@ -174,6 +180,8 @@ public class IPCConnectionManager { private class NetworkThread extends Thread { private final Selector selector; + private final Set<SocketChannel> openChannels = new HashSet<>(); + public NetworkThread() { super("IPC Network Listener Thread [" + address + "]"); setDaemon(true); @@ -187,6 +195,14 @@ public class IPCConnectionManager { @Override public void run() { try { + doRun(); + } finally { + cleanup(); + } + } + + private void doRun() { + try { serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { throw new RuntimeException(e); @@ -204,6 +220,7 @@ public class IPCConnectionManager { if (!workingPendingConnections.isEmpty()) { for (IPCHandle handle : workingPendingConnections) { SocketChannel channel = SocketChannel.open(); + openChannels.add(channel); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.configureBlocking(false); SelectionKey cKey; @@ -269,7 +286,8 @@ public class IPCConnectionManager { system.getPerformanceCounters().addMessageBytesReceived(len); if (len < 0) { key.cancel(); - channel.close(); + IOUtils.closeQuietly(channel); + openChannels.remove(channel); handle.close(); } else { handle.processIncomingMessages(); @@ -285,7 +303,8 @@ public class IPCConnectionManager { system.getPerformanceCounters().addMessageBytesSent(len); if (len < 0) { key.cancel(); - channel.close(); + IOUtils.closeQuietly(channel); + openChannels.remove(channel); handle.close(); } else if (!writeBuffer.hasRemaining()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); @@ -297,6 +316,7 @@ public class IPCConnectionManager { } else if (key.isAcceptable()) { assert sc == serverSocketChannel; SocketChannel channel = serverSocketChannel.accept(); + openChannels.add(channel); channel.setOption(StandardSocketOptions.TCP_NODELAY, true); channel.configureBlocking(false); IPCHandle handle = new IPCHandle(system, null); @@ -334,6 +354,14 @@ public class IPCConnectionManager { } } + private void cleanup() { + for (Channel channel : openChannels) { + IOUtils.closeQuietly(channel); + } + openChannels.clear(); + IOUtils.closeQuietly(selector); + } + private boolean finishConnect(SocketChannel channel) { boolean connectFinished = false; try { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/86cbec53/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index 0997e57..d9ab210 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -59,7 +59,8 @@ public class IPCSystem { public void start() { cMgr.start(); } - public void stop() throws IOException{ + + public void stop() { cMgr.stop(); }
