This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4e5e63499f217d1f869de7871dba773520beb7c2 Author: Dmitry Lychagin <[email protected]> AuthorDate: Fri Jun 10 12:24:10 2022 -0700 [NO ISSUE][NET] Move ResultSet from servlet to application context - user model changes: no - storage format changes: no - interface changes: no Details: - Move ResultSet instance ownership from servlet context to application context - Add close() methods to HyracksConnection, ResultDirectory, and ResultSet - Move CcApplicationContext from asterix-runtime to asterix-app Change-Id: Id46661bdf62538a901258b5c72c065a3865a0650 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16384 Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> Reviewed-by: Murtadha Al Hubail <[email protected]> Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17346 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../api/http/server/AbstractQueryApiServlet.java | 10 +---- .../apache/asterix/api/http/server/ApiServlet.java | 2 +- .../asterix/api/http/server/ServletUtil.java | 11 +---- .../asterix/app/cc}/CcApplicationContext.java | 50 +++++++++++++++++++--- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 41 +++++++++++++++--- .../apache/asterix/app/result/ResultReader.java | 14 +++++- .../asterix/hyracks/bootstrap/CCApplication.java | 6 +-- .../api/http/servlet/VersionApiServletTest.java | 2 +- .../asterix/app/bootstrap/TestNodeController.java | 2 +- .../asterix/runtime/ClusterStateManagerTest.java | 2 +- .../test/active/ActiveEventsListenerTest.java | 2 +- .../asterix/test/active/ActiveStatsTest.java | 2 +- .../asterix/common/api/IApplicationContext.java | 6 +++ hyracks-fullstack/hyracks/hyracks-client/pom.xml | 5 +++ .../hyracks/client/result/ResultDirectory.java | 8 +++- .../apache/hyracks/client/result/ResultSet.java | 18 ++++++-- .../apache/hyracks/ipc/impl/HyracksConnection.java | 8 +++- 17 files changed, 143 insertions(+), 46 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index 4dadf55570..bf77c24f96 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -28,8 +28,6 @@ import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.hyracks.ipc.exceptions.IPCException; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -61,13 +59,7 @@ public class AbstractQueryApiServlet extends AbstractServlet { } protected IResultSet getResultSet() throws Exception { // NOSONAR - try { - return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx); - } catch (IPCException e) { - LOGGER.log(Level.WARN, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e); - ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc()); - return ServletUtil.getResultSet(getHyracksClientConnection(), appCtx, ctx); - } + return ServletUtil.getResultSet(appCtx, ctx); } protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 081c69a84e..56ad88e4c6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -134,7 +134,7 @@ public class ApiServlet extends AbstractServlet { try { // TODO: warnings should be retrieved from warnings collectors IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); - IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx); + IResultSet resultSet = ServletUtil.getResultSet(appCtx, ctx); IParser parser = parserFactory.createParser(query); List<Statement> statements = parser.parse(); SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true, planFormat); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java index 3ac37bf6ac..8d5e4db974 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java @@ -24,29 +24,22 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.metadata.DataverseName; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.net.URLCodec; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.result.IResultSet; -import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.http.api.IServletRequest; public class ServletUtil { - static IResultSet getResultSet(IHyracksClientConnection hcc, IApplicationContext appCtx, - final Map<String, Object> ctx) throws Exception { + static IResultSet getResultSet(IApplicationContext appCtx, final Map<String, Object> ctx) throws Exception { IResultSet resultSet = (IResultSet) ctx.get(RESULTSET_ATTR); if (resultSet == null) { synchronized (ctx) { resultSet = (IResultSet) ctx.get(RESULTSET_ATTR); if (resultSet == null) { - resultSet = new ResultSet(hcc, - appCtx.getServiceContext().getControllerService().getNetworkSecurityManager() - .getSocketChannelFactory(), - appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS); + resultSet = appCtx.getResultSet(); ctx.put(RESULTSET_ATTR, resultSet); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java similarity index 87% rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java index 66f0e734ca..880880e851 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.runtime.utils; +package org.apache.asterix.app.cc; import java.io.IOException; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; +import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IConfigValidator; import org.apache.asterix.common.api.IConfigValidatorFactory; import org.apache.asterix.common.api.ICoordinationService; @@ -56,13 +57,21 @@ import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.asterix.runtime.job.listener.NodeJobTracker; import org.apache.asterix.runtime.transaction.ResourceIdManager; +import org.apache.asterix.runtime.utils.BulkTxnIdFactory; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.runtime.utils.NoOpCoordinationService; +import org.apache.asterix.runtime.utils.RequestTracker; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IJobLifecycleListener; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.ipc.impl.HyracksConnection; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.util.NetworkUtil; /* * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager @@ -87,7 +96,8 @@ public class CcApplicationContext implements ICcApplicationContext { private MessagingProperties messagingProperties; private NodeProperties nodeProperties; private Supplier<IMetadataBootstrap> metadataBootstrapSupplier; - private IHyracksClientConnection hcc; + private volatile HyracksConnection hcc; + private volatile ResultSet resultSet; private Object extensionManager; private INcLifecycleCoordinator ftStrategy; private IJobLifecycleListener activeLifeCycleListener; @@ -103,7 +113,7 @@ public class CcApplicationContext implements ICcApplicationContext { private final IAdapterFactoryService adapterFactoryService; private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true); - public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, + public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager, @@ -188,18 +198,44 @@ public class CcApplicationContext implements ICcApplicationContext { @Override public IHyracksClientConnection getHcc() throws HyracksDataException { - if (!hcc.isConnected()) { + HyracksConnection hc = hcc; + if (!hc.isConnected()) { synchronized (this) { - if (!hcc.isConnected()) { + hc = hcc; + if (!hc.isConnected()) { try { - hcc = new HyracksConnection(hcc.getHost(), hcc.getPort()); + ResultSet rs = resultSet; + resultSet = null; + NetworkUtil.closeQuietly(rs); + + NetworkUtil.closeQuietly(hc); + hcc = hc = new HyracksConnection(hcc.getHost(), hcc.getPort()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + } + return hc; + } + + @Override + public IResultSet getResultSet() throws HyracksDataException { + ResultSet rs = resultSet; + if (rs == null) { + synchronized (this) { + rs = resultSet; + if (rs == null) { + try { + resultSet = rs = ResultReader.createResultSet(getHcc(), ccServiceCtx.getControllerService(), + compilerProperties); } catch (Exception e) { throw HyracksDataException.create(e); } } } } - return hcc; + return rs; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 1a8916836d..a46522e1e7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import org.apache.asterix.active.ActiveManager; +import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IConfigValidator; import org.apache.asterix.common.api.IConfigValidatorFactory; import org.apache.asterix.common.api.ICoordinationService; @@ -86,6 +87,8 @@ import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.network.INetworkSecurityManager; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.ipc.impl.HyracksConnection; @@ -108,6 +111,7 @@ import org.apache.hyracks.storage.common.file.FileMapManager; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; import org.apache.hyracks.storage.common.file.IResourceIdFactory; import org.apache.hyracks.util.MaintainedThreadNameExecutorService; +import org.apache.hyracks.util.NetworkUtil; import org.apache.hyracks.util.cache.CacheManager; import org.apache.hyracks.util.cache.ICacheManager; import org.apache.logging.log4j.Level; @@ -147,7 +151,8 @@ public class NCAppRuntimeContext implements INcApplicationContext { private final NCExtensionManager ncExtensionManager; private final IStorageComponentProvider componentProvider; private final IPersistedResourceRegistry persistedResourceRegistry; - private IHyracksClientConnection hcc; + private volatile HyracksConnection hcc; + private volatile ResultSet resultSet; private IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private IReplicaManager replicaManager; private IReceptionist receptionist; @@ -513,15 +518,22 @@ public class NCAppRuntimeContext implements INcApplicationContext { @Override public IHyracksClientConnection getHcc() throws HyracksDataException { - if (hcc == null || !hcc.isConnected()) { + HyracksConnection hc = hcc; + if (hc == null || !hc.isConnected()) { synchronized (this) { - if (hcc == null || !hcc.isConnected()) { + hc = hcc; + if (hc == null || !hc.isConnected()) { try { + ResultSet rs = resultSet; + resultSet = null; + NetworkUtil.closeQuietly(rs); + NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService(); // TODO(mblow): multicc CcId primaryCcId = ncSrv.getPrimaryCcId(); ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo(); - hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(), + NetworkUtil.closeQuietly(hc); + hcc = hc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(), ncSrv.getNetworkSecurityManager().getSocketChannelFactory()); } catch (Exception e) { throw HyracksDataException.create(e); @@ -529,7 +541,26 @@ public class NCAppRuntimeContext implements INcApplicationContext { } } } - return hcc; + return hc; + } + + @Override + public IResultSet getResultSet() throws HyracksDataException { + ResultSet rs = resultSet; + if (rs == null) { + synchronized (this) { + rs = resultSet; + if (rs == null) { + try { + resultSet = rs = ResultReader.createResultSet(getHcc(), ncServiceContext.getControllerService(), + compilerProperties); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } + } + return rs; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java index 1acae87019..56dfd5e0c2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.app.result; +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -27,12 +29,14 @@ import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.api.result.ResultJobRecord.Status; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; public class ResultReader { - private IResultSetReader reader; + private final IResultSetReader reader; - private IFrameTupleAccessor frameTupleAccessor; + private final IFrameTupleAccessor frameTupleAccessor; // Number of parallel result reader buffers public static final int NUM_READERS = 1; @@ -57,4 +61,10 @@ public class ResultReader { public IResultMetadata getMetadata() { return reader.getResultMetadata(); } + + public static ResultSet createResultSet(IHyracksClientConnection hcc, IControllerService srv, + CompilerProperties compilerProperties) throws Exception { + return new ResultSet(hcc, srv.getNetworkSecurityManager().getSocketChannelFactory(), + compilerProperties.getFrameSize(), ResultReader.NUM_READERS); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 8f3deb1cb9..2a66cfdece 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -53,6 +53,7 @@ import org.apache.asterix.api.http.server.ShutdownApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.app.config.ConfigValidator; import org.apache.asterix.app.io.PersistedResourceRegistry; import org.apache.asterix.app.replication.NcLifecycleCoordinator; @@ -85,7 +86,6 @@ import org.apache.asterix.metadata.bootstrap.AsterixStateProxy; import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.runtime.job.resource.JobCapacityController; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.Receptionist; import org.apache.asterix.util.MetadataBuiltinFunctions; @@ -126,7 +126,7 @@ public class CCApplication extends BaseCCApplication { protected WebManager webManager; protected ICcApplicationContext appCtx; private IJobCapacityController jobCapacityController; - private IHyracksClientConnection hcc; + private HyracksConnection hcc; @Override public void init(IServiceContext serviceCtx) throws Exception { @@ -209,7 +209,7 @@ public class CCApplication extends BaseCCApplication { IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory, CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException { - return new CcApplicationContext(ccServiceCtx, getHcc(), () -> MetadataManager.INSTANCE, globalRecoveryManager, + return new CcApplicationContext(ccServiceCtx, hcc, () -> MetadataManager.INSTANCE, globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(), createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager, adapterFactoryService); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java index 8dcfa261a0..eb63218be6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java @@ -31,8 +31,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.server.VersionApiServlet; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.common.config.BuildProperties; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 0b808815f1..34696b1592 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.app.nc.TransactionSubsystem; import org.apache.asterix.common.config.TransactionProperties; @@ -61,7 +62,6 @@ import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.B import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable; import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable; import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.transaction.management.runtime.CommitRuntime; import org.apache.asterix.transaction.management.service.logging.LogReader; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index b80fa303c0..a2a3b48393 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -26,6 +26,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.app.replication.NcLifecycleCoordinator; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; @@ -37,7 +38,6 @@ import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.asterix.runtime.utils.BulkTxnIdFactory; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.ICCServiceContext; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index c71e602fa7..e5e33d00be 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -34,6 +34,7 @@ import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -53,7 +54,6 @@ import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.runtime.functions.FunctionCollection; import org.apache.asterix.runtime.functions.FunctionManager; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.active.TestEventsListener.Behavior; import org.apache.asterix.test.base.TestMethodTracer; import org.apache.asterix.translator.IStatementExecutor; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index c51fd95659..cb123bfb92 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -37,6 +37,7 @@ import org.apache.asterix.algebra.base.ILangExtension.Language; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; +import org.apache.asterix.app.cc.CcApplicationContext; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.common.exceptions.ErrorCode; @@ -45,7 +46,6 @@ import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionOutput; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index 67f8253689..f4e241c0f2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -31,6 +31,7 @@ import org.apache.asterix.common.config.TransactionProperties; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.result.IResultSet; public interface IApplicationContext { @@ -66,6 +67,11 @@ public interface IApplicationContext { */ IHyracksClientConnection getHcc() throws HyracksDataException; + /** + * @return a result set provider associated with {@link IHyracksClientConnection} + */ + IResultSet getResultSet() throws HyracksDataException; + /** * @return the cluster coordination service. */ diff --git a/hyracks-fullstack/hyracks/hyracks-client/pom.xml b/hyracks-fullstack/hyracks/hyracks-client/pom.xml index 409da1f8b3..66b71fa6b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-client/pom.xml @@ -100,6 +100,11 @@ <artifactId>hyracks-control-nc</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java index b335a9385e..5dd5fff3be 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.client.result; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -35,7 +36,7 @@ import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; //TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? -public class ResultDirectory implements IResultDirectory { +public class ResultDirectory implements IResultDirectory, Closeable { private final IPCSystem ipc; private final IResultDirectory remoteResultDirectory; @@ -64,4 +65,9 @@ public class ResultDirectory implements IResultDirectory { public IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception { return remoteResultDirectory.getResultMetadata(jobId, rsId); } + + @Override + public void close() throws IOException { + ipc.stop(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java index 4d8767fd26..8a88045fc3 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java @@ -18,6 +18,9 @@ */ package org.apache.hyracks.client.result; +import java.io.Closeable; +import java.io.IOException; + import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.context.IHyracksCommonContext; @@ -25,15 +28,15 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.network.ISocketChannelFactory; -import org.apache.hyracks.api.result.IResultDirectory; import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.client.net.ClientNetworkManager; import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.apache.hyracks.util.NetworkUtil; -public class ResultSet implements IResultSet { - private final IResultDirectory resultDirectory; +public class ResultSet implements IResultSet, Closeable { + private final ResultDirectory resultDirectory; private final ClientNetworkManager netManager; @@ -50,6 +53,15 @@ public class ResultSet implements IResultSet { resultClientCtx = new ResultClientContext(frameSize); } + @Override + public void close() throws IOException { + try { + netManager.stop(); + } finally { + NetworkUtil.closeQuietly(resultDirectory); + } + } + @Override public IResultSetReader createReader(JobId jobId, ResultSetId resultSetId) throws HyracksDataException { IResultSetReader reader = null; diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java index 9351348869..5b65de8cec 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.ipc.impl; +import java.io.Closeable; import java.io.File; import java.net.InetSocketAddress; import java.net.URL; @@ -70,7 +71,7 @@ import org.apache.logging.log4j.Logger; * * @author vinayakb */ -public final class HyracksConnection implements IHyracksClientConnection { +public final class HyracksConnection implements Closeable, IHyracksClientConnection { private static final Logger LOGGER = LogManager.getLogger(); @@ -122,6 +123,11 @@ public final class HyracksConnection implements IHyracksClientConnection { this(ccHost, ccPort, PlainSocketChannelFactory.INSTANCE); } + @Override + public void close() { + ipc.stop(); + } + @Override public JobStatus getJobStatus(JobId jobId) throws Exception { return hci.getJobStatus(jobId);
