Repository: asterixdb-bad Updated Branches: refs/heads/master 4345dffae -> 83f6d53b0
Separate Predistributed Jobs from other Active Jobs Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1 Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/83f6d53b Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/83f6d53b Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/83f6d53b Branch: refs/heads/master Commit: 83f6d53b025a81d8deba750f74b8ce9d03607961 Parents: 4345dff Author: Till Westmann <[email protected]> Authored: Wed Jul 26 01:24:33 2017 -0700 Committer: Till Westmann <[email protected]> Committed: Wed Jul 26 01:24:33 2017 -0700 ---------------------------------------------------------------------- .../bad/lang/BADQueryTranslatorFactory.java | 3 +- .../asterix/bad/lang/BADStatementExecutor.java | 9 +- .../lang/statement/ChannelDropStatement.java | 15 +- .../statement/ChannelSubscribeStatement.java | 2 +- .../statement/ChannelUnsubscribeStatement.java | 2 +- .../lang/statement/CreateChannelStatement.java | 60 ++++---- .../statement/CreateProcedureStatement.java | 43 +++--- .../statement/ExecuteProcedureStatement.java | 10 +- .../lang/statement/ProcedureDropStatement.java | 12 +- .../bad/metadata/BADMetadataExtension.java | 4 +- .../bad/metadata/BrokerTupleTranslator.java | 2 +- .../bad/metadata/ChannelTupleTranslator.java | 2 +- .../metadata/PrecompiledJobEventListener.java | 143 +++++++++++++++++-- .../bad/metadata/ProcedureTupleTranslator.java | 2 +- 14 files changed, 209 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java index 09a436b..8f5d520 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java @@ -33,7 +33,6 @@ public class BADQueryTranslatorFactory extends DefaultStatementExecutorFactory { @Override public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) { - return new BADStatementExecutor(appCtx, statements, output, compilationProvider, storageComponentProvider, - executorService); + return new BADStatementExecutor(appCtx, statements, output, compilationProvider, executorService); } } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java index 5775e9b..bc17a7d 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -28,7 +28,6 @@ import org.apache.asterix.bad.lang.statement.ProcedureDropStatement; import org.apache.asterix.bad.metadata.Broker; import org.apache.asterix.bad.metadata.Channel; import org.apache.asterix.bad.metadata.Procedure; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.compiler.provider.ILangCompilationProvider; @@ -44,9 +43,8 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; public class BADStatementExecutor extends QueryTranslator { public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, - ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider, - ExecutorService executorService) { - super(appCtx, statements, output, compliationProvider, storageComponentProvider, executorService); + ILangCompilationProvider compliationProvider, ExecutorService executorService) { + super(appCtx, statements, output, compliationProvider, executorService); } @Override @@ -59,8 +57,7 @@ public class BADStatementExecutor extends QueryTranslator { metadataProvider.setMetadataTxnContext(mdTxnCtx); Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName(); List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue()); - MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(), - metadataProvider.getStorageComponentProvider()); + MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); for (Broker broker : brokers) { tempMdProvider.getLocks().reset(); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java index 1b655da..907bd0e 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java @@ -18,10 +18,9 @@ */ package org.apache.asterix.bad.lang.statement; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.bad.BADConstants; import org.apache.asterix.bad.lang.BADLangExtension; @@ -91,10 +90,9 @@ public class ChannelDropStatement implements IExtensionStatement { boolean txnActive = false; EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - PrecompiledJobEventListener listener = - (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId); Channel channel = null; MetadataTransactionContext mdTxnCtx = null; @@ -115,14 +113,13 @@ public class ChannelDropStatement implements IExtensionStatement { listener.getExecutorService().shutdownNow(); JobId hyracksJobId = listener.getJobId(); listener.deActivate(); - activeEventHandler.removeListener(listener); + activeEventHandler.unregisterListener(listener); if (hyracksJobId != null) { hcc.destroyJob(hyracksJobId); } //Create a metadata provider to use in nested jobs. - MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(), - metadataProvider.getStorageComponentProvider()); + MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); //Drop the Channel Datasets //TODO: Need to find some way to handle if this fails. http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java index 5bf0690..df8dab1 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -186,7 +186,7 @@ public class ChannelSubscribeStatement implements IExtensionStatement { subscriptionTuple.setVarCounter(varCounter); MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), - metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider()); + metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); if (subscriptionId == null) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java index 60de69e..28d09df 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java @@ -144,7 +144,7 @@ public class ChannelUnsubscribeStatement implements IExtensionStatement { SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(); delete.accept(visitor, null); MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), - metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider()); + metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 7d91d2e..a43020f 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -21,15 +21,17 @@ package org.apache.asterix.bad.lang.statement; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.StringReader; -import java.util.*; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.bad.BADConstants; import org.apache.asterix.bad.ChannelJobService; @@ -42,6 +44,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.lang.common.base.Expression; @@ -56,7 +59,6 @@ import org.apache.asterix.lang.common.statement.InternalDetailsDecl; import org.apache.asterix.lang.common.statement.SetStatement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -67,7 +69,6 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; @@ -131,11 +132,13 @@ public class CreateChannelStatement implements IExtensionStatement { return channelResultsInsertQuery; } - @Override public byte getCategory() { + @Override + public byte getCategory() { return Category.DDL; } - @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + @Override + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { return null; } @@ -160,7 +163,8 @@ public class CreateChannelStatement implements IExtensionStatement { } - @Override public byte getKind() { + @Override + public byte getKind() { return Kind.EXTENSION; } @@ -188,14 +192,13 @@ public class CreateChannelStatement implements IExtensionStatement { fieldNames.add(BADConstants.ResultId); partitionFields.add(fieldNames); idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false); - DatasetDecl createResultsDataset = - new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME), - resultsTypeName, null, null, null, null, new HashMap<String, String>(), - new HashMap<String, String>(), DatasetType.INTERNAL, idd, true); + DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName, + new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null, + new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true); //Run both statements to create datasets - ((QueryTranslator) statementExecutor) - .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc); + ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, + hcc); metadataProvider.getLocks().reset(); ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc); @@ -231,9 +234,8 @@ public class CreateChannelStatement implements IExtensionStatement { SetStatement ss = (SetStatement) fStatements.get(0); metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue()); - return ((QueryTranslator) statementExecutor) - .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null, - stats, true, null, null); + return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1), + hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null); } private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc, @@ -244,15 +246,15 @@ public class CreateChannelStatement implements IExtensionStatement { if (predistributed) { jobId = hcc.distributeJob(channeljobSpec); } - ScheduledExecutorService ses = ChannelJobService - .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc, - ChannelJobService.findPeriod(duration)); + ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), + jobId, hcc, ChannelJobService.findPeriod(duration)); listener.storeDistributedInfo(jobId, ses, null); } } - @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { @@ -271,10 +273,9 @@ public class CreateChannelStatement implements IExtensionStatement { Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding); EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - PrecompiledJobEventListener listener = - (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId); boolean alreadyActive = false; Channel channel = null; @@ -287,7 +288,7 @@ public class CreateChannelStatement implements IExtensionStatement { throw new AlgebricksException("A channel with this name " + channelName + " already exists."); } if (listener != null) { - alreadyActive = listener.isEntityActive(); + alreadyActive = listener.isActive(); } if (alreadyActive) { throw new AsterixException("Channel " + channelName + " is already running"); @@ -304,15 +305,14 @@ public class CreateChannelStatement implements IExtensionStatement { throw new AsterixException("The channel name:" + channelName + " is not available."); } MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), - metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider()); + metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); //Create Channel Datasets createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse); tempMdProvider.getLocks().reset(); //Create Channel Internal Job - JobSpecification channeljobSpec = - createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats, - dataverse); + JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName, + tempMdProvider, hcc, hdc, stats, dataverse); // Now we subscribe if (listener == null) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index d203905..1c497a8 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -26,10 +26,9 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.bad.BADConstants; @@ -41,6 +40,7 @@ import org.apache.asterix.bad.metadata.Procedure; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Statement; @@ -52,7 +52,6 @@ import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.struct.VarIdentifier; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -96,7 +95,8 @@ public class CreateProcedureStatement implements IExtensionStatement { this.period = (CallExpr) period; } - @Override public byte getKind() { + @Override + public byte getKind() { return Kind.EXTENSION; } @@ -108,7 +108,8 @@ public class CreateProcedureStatement implements IExtensionStatement { return signature; } - @Override public byte getCategory() { + @Override + public byte getCategory() { return Category.DDL; } @@ -116,7 +117,8 @@ public class CreateProcedureStatement implements IExtensionStatement { return period; } - @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + @Override + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { return null; } @@ -167,9 +169,10 @@ public class CreateProcedureStatement implements IExtensionStatement { throw new CompilationException("Procedure can only execute a single statement"); } if (fStatements.get(0).getKind() == Statement.Kind.INSERT) { - return new Pair<>(((QueryTranslator) statementExecutor) - .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, - null, stats, true, null, null), PrecompiledType.INSERT); + return new Pair<>( + ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, + fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null), + PrecompiledType.INSERT); } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) { Pair<JobSpecification, PrecompiledType> pair = new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)), @@ -179,8 +182,8 @@ public class CreateProcedureStatement implements IExtensionStatement { } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) { SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(); fStatements.get(0).accept(visitor, null); - return new Pair<>(((QueryTranslator) statementExecutor) - .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true), PrecompiledType.DELETE); + return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, + fStatements.get(0), hcc, true), PrecompiledType.DELETE); } else { throw new CompilationException("Procedure can only execute a single delete, insert, or query"); } @@ -193,18 +196,18 @@ public class CreateProcedureStatement implements IExtensionStatement { listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId)); } - @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + @Override + public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); initialize(); String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace())); EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName()); - PrecompiledJobEventListener listener = - (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId); + PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId); boolean alreadyActive = false; Procedure procedure = null; @@ -212,13 +215,13 @@ public class CreateProcedureStatement implements IExtensionStatement { try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - procedure = BADLangExtension - .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity())); + procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(), + Integer.toString(signature.getArity())); if (procedure != null) { throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists."); } if (listener != null) { - alreadyActive = listener.isEntityActive(); + alreadyActive = listener.isActive(); } if (alreadyActive) { throw new AsterixException("Procedure " + signature.getName() + " is already running"); @@ -228,7 +231,7 @@ public class CreateProcedureStatement implements IExtensionStatement { Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration); MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), - metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider()); + metadataProvider.getDefaultDataverse()); tempMdProvider.setConfig(metadataProvider.getConfig()); metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index 9bf3718..e60570c 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -21,11 +21,10 @@ package org.apache.asterix.bad.lang.statement; import java.util.EnumSet; import java.util.concurrent.ScheduledExecutorService; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; import org.apache.asterix.api.http.server.ResultUtil; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.bad.BADConstants; @@ -95,13 +94,12 @@ public class ExecuteProcedureStatement implements IExtensionStatement { IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName)); boolean txnActive = false; EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName); - PrecompiledJobEventListener listener = - (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId); + PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId); Procedure procedure = null; MetadataTransactionContext mdTxnCtx = null; http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java index f7c3a74..a1bf0d3 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java @@ -18,10 +18,9 @@ */ package org.apache.asterix.bad.lang.statement; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.bad.BADConstants; import org.apache.asterix.bad.lang.BADLangExtension; @@ -82,16 +81,15 @@ public class ProcedureDropStatement implements IExtensionStatement { IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); FunctionSignature signature = getFunctionSignature(); String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace())); signature.setNamespace(dataverse); boolean txnActive = false; EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName()); - PrecompiledJobEventListener listener = - (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId); + PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId); Procedure procedure = null; MetadataTransactionContext mdTxnCtx = null; @@ -115,7 +113,7 @@ public class ProcedureDropStatement implements IExtensionStatement { } JobId hyracksJobId = listener.getJobId(); listener.deActivate(); - activeEventHandler.removeListener(listener); + activeEventHandler.unregisterListener(listener); if (hyracksJobId != null) { hcc.destroyJob(hyracksJobId); } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java index cd4470f..cd2ff86 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java @@ -42,8 +42,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class BADMetadataExtension implements IMetadataExtension { - public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId( - BADConstants.BAD_METADATA_EXTENSION_NAME, 0); + public static final ExtensionId BAD_METADATA_EXTENSION_ID = + new ExtensionId(BADConstants.BAD_METADATA_EXTENSION_NAME, 0); public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME, NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java index d3d2e66..de1aab8 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java @@ -19,8 +19,8 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator; import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java index e6d0249..d577260 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java @@ -19,9 +19,9 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator; import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java index 5eb18d1..8e0cf5f 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java @@ -18,27 +18,30 @@ */ package org.apache.asterix.bad.metadata; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.apache.asterix.active.ActiveEvent; +import org.apache.asterix.active.ActiveEvent.Kind; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.IActiveEntityEventSubscriber; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.metadata.IDataset; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.log4j.Level; import org.apache.log4j.Logger; -public class PrecompiledJobEventListener extends ActiveEntityEventsListener { - private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class); +public class PrecompiledJobEventListener implements IActiveEntityEventsListener { - private ScheduledExecutorService executorService = null; - private ResultReader resultReader; + private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class); public enum PrecompiledType { CHANNEL, @@ -47,15 +50,119 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { DELETE } + enum RequestState { + INIT, + STARTED, + FINISHED + } + + private ScheduledExecutorService executorService = null; + private ResultReader resultReader; private final PrecompiledType type; + // members + protected volatile ActivityState state; + protected JobId jobId; + protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>(); + protected final ICcApplicationContext appCtx; + protected final EntityId entityId; + protected final List<IDataset> datasets; + protected final ActiveEvent statsUpdatedEvent; + protected long statsTimestamp; + protected String stats; + protected RequestState statsRequestState; + protected final String runtimeName; + protected final AlgebricksAbsolutePartitionConstraint locations; + protected int numRegistered; public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type, List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { - super(appCtx, entityId, datasets, locations, runtimeName); + this.appCtx = appCtx; + this.entityId = entityId; + this.datasets = datasets; + this.state = ActivityState.STOPPED; + this.statsTimestamp = -1; + this.statsRequestState = RequestState.INIT; + this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null); + this.stats = "{\"Stats\":\"N/A\"}"; + this.runtimeName = runtimeName; + this.locations = locations; + this.numRegistered = 0; state = ActivityState.STOPPED; this.type = type; } + protected synchronized void handle(ActivePartitionMessage message) { + if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + numRegistered++; + if (numRegistered == locations.getLocations().length) { + state = ActivityState.RUNNING; + } + } + } + + @Override + public EntityId getEntityId() { + return entityId; + } + + @Override + public ActivityState getState() { + return state; + } + + @Override + public boolean isEntityUsingDataset(IDataset dataset) { + return datasets.contains(dataset); + } + + public JobId getJobId() { + return jobId; + } + + @Override + public String getStats() { + return stats; + } + + @Override + public long getStatsTimeStamp() { + return statsTimestamp; + } + + public String formatStats(List<String> responses) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append("{\"Stats\": [").append(responses.get(0)); + for (int i = 1; i < responses.size(); i++) { + strBuilder.append(", ").append(responses.get(i)); + } + strBuilder.append("]}"); + return strBuilder.toString(); + } + + protected synchronized void notifySubscribers(ActiveEvent event) { + notifyAll(); + Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator(); + while (it.hasNext()) { + IActiveEntityEventSubscriber subscriber = it.next(); + if (subscriber.isDone()) { + it.remove(); + } else { + try { + subscriber.notify(event); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARN, "Failed to notify subscriber", e); + } + if (subscriber.isDone()) { + it.remove(); + } + } + } + } + + public AlgebricksAbsolutePartitionConstraint getLocations() { + return locations; + } + public ResultReader getResultReader() { return resultReader; } @@ -74,10 +181,6 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { return executorService; } - public boolean isEntityActive() { - return state == ActivityState.STARTED; - } - public void deActivate() { state = ActivityState.STOPPED; } @@ -110,7 +213,7 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { if (LOGGER.isInfoEnabled()) { LOGGER.info("Channel Job started for " + entityId); } - state = ActivityState.STARTED; + state = ActivityState.RUNNING; } private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception { @@ -120,7 +223,21 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { } @Override - public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException { + public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException { // no op } + + @Override + public boolean isActive() { + return state == ActivityState.RUNNING; + } + + @Override + public void unregister() throws HyracksDataException { + } + + @Override + public Exception getJobFailure() { + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/83f6d53b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java index f324a46..1aa633f 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java @@ -26,8 +26,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator; import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.ARecord;
