Enable dependencies in the metadata for BAD entities Allow Channels and Procedures to store dependencies on Datasets and Functions
Prevent dropping of these dependencies Add Error tests Change-Id: Ic6ac2daad03844a042aded8e17bb231a06f59cbe Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/5b870653 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/5b870653 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/5b870653 Branch: refs/heads/master Commit: 5b870653b3da64733246b1396d206fe84b8f3890 Parents: c6c98ce Author: Steven Glenn Jacobs <[email protected]> Authored: Fri Jan 19 19:52:51 2018 -0800 Committer: Steven Glenn Jacobs <[email protected]> Committed: Fri Jan 19 19:52:51 2018 -0800 ---------------------------------------------------------------------- .../org/apache/asterix/bad/BADConstants.java | 1 + .../asterix/bad/lang/BADLangExtension.java | 12 ++ .../asterix/bad/lang/BADStatementExecutor.java | 160 +++++++++++++++++-- .../lang/statement/ChannelDropStatement.java | 6 +- .../lang/statement/CreateChannelStatement.java | 63 +++----- .../statement/CreateProcedureStatement.java | 40 +++-- .../bad/metadata/AllChannelsSearchKey.java | 40 +++++ .../bad/metadata/AllProceduresSearchKey.java | 40 +++++ .../bad/metadata/BADMetadataRecordTypes.java | 16 +- .../apache/asterix/bad/metadata/Channel.java | 40 ++++- .../bad/metadata/ChannelTupleTranslator.java | 107 ++++++++++--- .../metadata/DeployedJobSpecEventListener.java | 6 +- .../apache/asterix/bad/metadata/Procedure.java | 22 ++- .../bad/metadata/ProcedureTupleTranslator.java | 62 ++++++- .../src/main/resources/lang-extension/lang.txt | 6 + .../drop_function/drop_function.1.ddl.sqlpp | 57 +++++++ .../drop_function_dataverse.1.ddl.sqlpp | 56 +++++++ .../drop_results/drop_results.1.ddl.sqlpp | 57 +++++++ .../drop_subscriptions.1.ddl.sqlpp | 57 +++++++ .../room_occupants/room_occupants.1.ddl.sqlpp | 1 + .../create_procedure_check_metadata.1.ddl.sqlpp | 93 +++++++++++ ...reate_procedure_check_metadata.2.query.sqlpp | 22 +++ .../delete_procedure.1.ddl.sqlpp | 1 + .../delete_procedure_drop_dataset.1.ddl.sqlpp | 48 ++++++ .../delete_procedure_drop_function.1.ddl.sqlpp | 56 +++++++ .../delete_procedure_drop_index.1.ddl.sqlpp | 50 ++++++ .../insert_procedure_drop_dataset.1.ddl.sqlpp | 50 ++++++ .../insert_procedure_drop_dataverse.1.ddl.sqlpp | 49 ++++++ .../query_procedure_drop_dataset.1.ddl.sqlpp | 49 ++++++ .../query_procedure_drop_function.1.ddl.sqlpp | 59 +++++++ .../repetitive_insert_procedure.1.ddl.sqlpp | 1 + .../create_channel_check_metadata.1.adm | 2 +- .../drop_channel_check_metadata.1.adm | 4 +- .../create_procedure_check_metadata.1.adm | 6 + .../src/test/resources/runtimets/testsuite.xml | 81 +++++++++- 35 files changed, 1317 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java index 3aca099..d2d0fa3 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java @@ -45,6 +45,7 @@ public interface BADConstants { String Duration = "Duration"; String Function = "Function"; String FIELD_NAME_ARITY = "Arity"; + String FIELD_NAME_DEPENDENCIES = "Dependencies"; String FIELD_NAME_PARAMS = "Params"; String FIELD_NAME_RETURN_TYPE = "ReturnType"; String FIELD_NAME_DEFINITION = "Definition"; http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java index fa0c0da..7a085a3 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java @@ -21,6 +21,8 @@ package org.apache.asterix.bad.lang; import java.util.List; import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.bad.metadata.AllChannelsSearchKey; +import org.apache.asterix.bad.metadata.AllProceduresSearchKey; import org.apache.asterix.bad.metadata.Broker; import org.apache.asterix.bad.metadata.BrokerSearchKey; import org.apache.asterix.bad.metadata.Channel; @@ -111,6 +113,11 @@ public class BADLangExtension implements ILangExtension { return MetadataManager.INSTANCE.getEntities(mdTxnCtx, brokerSearchKey); } + public static List<Channel> getAllChannels(MetadataTransactionContext mdTxnCtx) throws AlgebricksException { + AllChannelsSearchKey channelSearchKey = new AllChannelsSearchKey(); + return MetadataManager.INSTANCE.getEntities(mdTxnCtx, channelSearchKey); + } + public static List<Channel> getChannels(MetadataTransactionContext mdTxnCtx, String dataverseName) throws AlgebricksException { DataverseChannelsSearchKey channelSearchKey = new DataverseChannelsSearchKey(dataverseName); @@ -123,4 +130,9 @@ public class BADLangExtension implements ILangExtension { return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey); } + public static List<Procedure> getAllProcedures(MetadataTransactionContext mdTxnCtx) throws AlgebricksException { + AllProceduresSearchKey proceduresSearchKey = new AllProceduresSearchKey(); + return MetadataManager.INSTANCE.getEntities(mdTxnCtx, proceduresSearchKey); + } + } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java index 28f7f00..8c7143f 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -21,8 +21,8 @@ package org.apache.asterix.bad.lang; import java.util.List; import java.util.concurrent.ExecutorService; -import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.bad.lang.statement.BrokerDropStatement; import org.apache.asterix.bad.lang.statement.ChannelDropStatement; import org.apache.asterix.bad.lang.statement.ProcedureDropStatement; @@ -30,16 +30,21 @@ import org.apache.asterix.bad.metadata.Broker; import org.apache.asterix.bad.metadata.Channel; import org.apache.asterix.bad.metadata.Procedure; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.DataverseDropStatement; +import org.apache.asterix.lang.common.statement.DropDatasetStatement; +import org.apache.asterix.lang.common.statement.FunctionDropStatement; +import org.apache.asterix.lang.common.statement.IndexDropStatement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; public class BADStatementExecutor extends QueryTranslator { @@ -49,39 +54,170 @@ public class BADStatementExecutor extends QueryTranslator { super(appCtx, statements, output, compliationProvider, executorService); } + //TODO: Most of this file could go away if we had metadata dependencies + + private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset) + throws CompilationException, AlgebricksException { + List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx); + for (Channel channel : channels) { + List<List<List<String>>> dependencies = channel.getDependencies(); + List<List<String>> datasetDependencies = dependencies.get(0); + for (List<String> dependency : datasetDependencies) { + if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) { + throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". " + + channel.getChannelId() + " depends on it!"); + } + } + + } + List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx); + for (Procedure procedure : procedures) { + List<List<List<String>>> dependencies = procedure.getDependencies(); + List<List<String>> datasetDependencies = dependencies.get(0); + for (List<String> dependency : datasetDependencies) { + if (dependency.get(0).equals(dataverse) && dependency.get(1).equals(dataset)) { + throw new CompilationException("Cannot alter dataset " + dataverse + "." + dataset + ". " + + procedure.getEntityId() + " depends on it!"); + } + } + + } + } + + @Override + public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + String dvId = getActiveDataverse(((DropDatasetStatement) stmt).getDataverseName()); + Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName(); + + checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue()); + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + super.handleDatasetDropStatement(metadataProvider, stmt, hcc, requestParameters); + } + + @Override + protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + String dvId = getActiveDataverse(((IndexDropStatement) stmt).getDataverseName()); + Identifier dsId = ((IndexDropStatement) stmt).getDatasetName(); + + checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue()); + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + super.handleIndexDropStatement(metadataProvider, stmt, hcc, requestParameters); + } + + @Override + protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + FunctionSignature sig = ((FunctionDropStatement) stmt).getFunctionSignature(); + + String dvId = getActiveDataverseName(sig.getNamespace()); + String function = sig.getName(); + String arity = Integer.toString(sig.getArity()); + + List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx); + for (Channel channel : channels) { + List<List<List<String>>> dependencies = channel.getDependencies(); + List<List<String>> datasetDependencies = dependencies.get(1); + for (List<String> dependency : datasetDependencies) { + if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function) + && dependency.get(2).equals(arity)) { + throw new CompilationException( + "Cannot drop function " + sig + ". " + channel.getChannelId() + " depends on it!"); + } + } + + } + List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx); + for (Procedure procedure : procedures) { + List<List<List<String>>> dependencies = procedure.getDependencies(); + List<List<String>> datasetDependencies = dependencies.get(1); + for (List<String> dependency : datasetDependencies) { + if (dependency.get(0).equals(dvId) && dependency.get(1).equals(function) + && dependency.get(2).equals(arity)) { + throw new CompilationException( + "Cannot drop function " + sig + ". " + procedure.getEntityId() + " depends on it!"); + } + } + + } + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + super.handleFunctionDropStatement(metadataProvider, stmt); + } + @Override protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { - //TODO: Remove this when metadata dependencies are in place - //TODO: Stop dataset drop when dataset used by channel - super.handleDataverseDropStatement(metadataProvider, stmt, hcc); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName(); - List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue()); MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse()); tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); - final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null); - for (Broker broker : brokers) { - tempMdProvider.getLocks().reset(); - BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false); - drop.handle(hcc, this, requestParameters, tempMdProvider, 0); + List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx); + for (Channel channel : channels) { + if (channel.getChannelId().getDataverse().equals(dvId.getValue())) { + continue; + } + List<List<List<String>>> dependencies = channel.getDependencies(); + for (List<List<String>> dependencyList : dependencies) { + for (List<String> dependency : dependencyList) { + if (dependency.get(0).equals(dvId.getValue())) { + throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". " + + channel.getChannelId() + " depends on it!"); + } + } + } + } + List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx); + for (Procedure procedure : procedures) { + if (procedure.getEntityId().getDataverse().equals(dvId.getValue())) { + continue; + } + List<List<List<String>>> dependencies = procedure.getDependencies(); + for (List<List<String>> dependencyList : dependencies) { + for (List<String> dependency : dependencyList) { + if (dependency.get(0).equals(dvId.getValue())) { + throw new CompilationException("Cannot drop dataverse " + dvId.getValue() + ". " + + procedure.getEntityId() + " depends on it!"); + } + } + } } - List<Channel> channels = BADLangExtension.getChannels(mdTxnCtx, dvId.getValue()); + final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null); for (Channel channel : channels) { + if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) { + continue; + } tempMdProvider.getLocks().reset(); ChannelDropStatement drop = new ChannelDropStatement(dvId, new Identifier(channel.getChannelId().getEntityName()), false); drop.handle(hcc, this, requestParameters, tempMdProvider, 0); } - List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx, dvId.getValue()); for (Procedure procedure : procedures) { + if (!procedure.getEntityId().getDataverse().equals(dvId.getValue())) { + continue; + } tempMdProvider.getLocks().reset(); ProcedureDropStatement drop = new ProcedureDropStatement(new FunctionSignature(dvId.getValue(), procedure.getEntityId().getEntityName(), procedure.getArity()), false); drop.handle(hcc, this, requestParameters, tempMdProvider, 0); } + List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue()); + for (Broker broker : brokers) { + tempMdProvider.getLocks().reset(); + BrokerDropStatement drop = new BrokerDropStatement(dvId, new Identifier(broker.getBrokerName()), false); + drop.handle(hcc, this, requestParameters, tempMdProvider, 0); + } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + super.handleDataverseDropStatement(metadataProvider, stmt, hcc); } } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java index 80355c0..e4b6d89 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java @@ -139,8 +139,9 @@ public class ChannelDropStatement implements IExtensionStatement { tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); //Drop the Channel Datasets //TODO: Need to find some way to handle if this fails. - //TODO: Prevent datasets for Channels from being dropped elsewhere + //Remove the Channel Metadata + MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel); DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverse), new Identifier(channel.getResultsDatasetName()), true); ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null); @@ -148,9 +149,6 @@ public class ChannelDropStatement implements IExtensionStatement { dropStmt = new DropDatasetStatement(new Identifier(dataverse), new Identifier(channel.getSubscriptionsDataset()), true); ((QueryTranslator) statementExecutor).handleDatasetDropStatement(tempMdProvider, dropStmt, hcc, null); - - //Remove the Channel Metadata - MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 3864248..feaa3ca 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -47,7 +47,6 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.expression.CallExpr; @@ -83,15 +82,15 @@ import org.apache.hyracks.dataflow.common.data.parsers.IValueParser; public class CreateChannelStatement implements IExtensionStatement { private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName()); - - private final Identifier dataverseName; private final Identifier channelName; private final FunctionSignature function; private final CallExpr period; + private Identifier dataverseName; private String duration; private InsertStatement channelResultsInsertQuery; private String subscriptionsTableName; private String resultsTableName; + private String dataverse; public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function, Expression period) { @@ -144,7 +143,7 @@ public class CreateChannelStatement implements IExtensionStatement { return null; } - public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName) + public void initialize(MetadataTransactionContext mdTxnCtx) throws AlgebricksException, HyracksDataException { Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function); if (lookup == null) { @@ -160,8 +159,6 @@ public class CreateChannelStatement implements IExtensionStatement { ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(bos); durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream); - this.resultsTableName = resultsTableName; - this.subscriptionsTableName = subscriptionsTableName; } @Override @@ -169,9 +166,8 @@ public class CreateChannelStatement implements IExtensionStatement { return Kind.EXTENSION; } - private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName, - Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc, - IHyracksDataset hdc, String dataverse) throws AsterixException, Exception { + private void createDatasets(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc) throws AsterixException, Exception { Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType); Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType); @@ -183,7 +179,7 @@ public class CreateChannelStatement implements IExtensionStatement { fieldNames.add(BADConstants.SubscriptionId); partitionFields.add(fieldNames); IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null); - DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName, + DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, new Identifier(subscriptionsTableName), new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true); @@ -193,15 +189,15 @@ public class CreateChannelStatement implements IExtensionStatement { fieldNames.add(BADConstants.ResultId); partitionFields.add(fieldNames); idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null); - DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName, + DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName), new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true); //Create an index on timestamp for results CreateIndexStatement createTimeIndex = new CreateIndexStatement(); - createTimeIndex.setDatasetName(resultsName); - createTimeIndex.setDataverseName(new Identifier(dataverse)); - createTimeIndex.setIndexName(new Identifier(resultsName + "TimeIndex")); + createTimeIndex.setDatasetName(new Identifier(resultsTableName)); + createTimeIndex.setDataverseName(dataverseName); + createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex")); createTimeIndex.setIfNotExists(false); createTimeIndex.setIndexType(IndexType.BTREE); createTimeIndex.setEnforced(false); @@ -226,18 +222,17 @@ public class CreateChannelStatement implements IExtensionStatement { } - private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName, - Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc, - IHyracksDataset hdc, Stats stats, String dataverse) throws Exception { + private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception { StringBuilder builder = new StringBuilder(); builder.append("SET inline_with \"false\";\n"); - builder.append("insert into " + dataverse + "." + resultsName); + builder.append("insert into " + dataverse + "." + resultsTableName); builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n"); builder.append("select result, "); builder.append(BADConstants.ChannelExecutionTime + ", "); builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ","); builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n"); - builder.append("from " + dataverse + "." + subscriptionsName + " sub,\n"); + builder.append("from " + dataverse + "." + subscriptionsTableName + " sub,\n"); builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n"); builder.append(function.getNamespace() + "." + function.getName() + "("); int i = 0; @@ -281,13 +276,12 @@ public class CreateChannelStatement implements IExtensionStatement { //3. Create the metadata entry for the channel //TODO: Figure out how to handle when a subset of the 3 tasks fails - //TODO: The compiled job will break if anything changes on the function or two datasets - // Need to make sure we do proper checking when altering these things - String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName); + dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName)); + dataverse = dataverseName.getValue(); + subscriptionsTableName = channelName + BADConstants.subscriptionEnding; + resultsTableName = channelName + BADConstants.resultsEnding; - Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding); - Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding); EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); ActiveNotificationHandler activeEventHandler = @@ -310,15 +304,13 @@ public class CreateChannelStatement implements IExtensionStatement { if (alreadyActive) { throw new AsterixException("Channel " + channelName + " is already running"); } - initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue()); - channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function, - duration); + initialize(mdTxnCtx); //check if names are available before creating anything - if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) { + if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) { throw new AsterixException("The channel name:" + channelName + " is not available."); } - if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) { + if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) { throw new AsterixException("The channel name:" + channelName + " is not available."); } MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), @@ -327,24 +319,21 @@ public class CreateChannelStatement implements IExtensionStatement { final IHyracksDataset hdc = requestContext.getHyracksDataset(); final Stats stats = requestContext.getStats(); //Create Channel Datasets - createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse); + createDatasets(statementExecutor, tempMdProvider, hcc); tempMdProvider.getLocks().reset(); //Create Channel Internal Job - JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName, - tempMdProvider, hcc, hdc, stats, dataverse); + JobSpecification channeljobSpec = createChannelJob(statementExecutor, tempMdProvider, hcc, hdc, stats); // Now we subscribe if (listener == null) { - List<IDataset> datasets = new ArrayList<>(); - datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue())); - datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue())); - //TODO: Add datasets used by channel function - listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null, + listener = new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.CHANNEL, null, "BadListener"); activeEventHandler.registerListener(listener); } setupExecutorJob(entityId, channeljobSpec, hcc, listener); + channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function, + duration, null); MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index b93f778..cd60b1a 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -21,6 +21,7 @@ package org.apache.asterix.bad.lang.statement; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -47,11 +48,14 @@ import org.apache.asterix.lang.common.expression.LiteralExpr; import org.apache.asterix.lang.common.expression.VariableExpr; import org.apache.asterix.lang.common.literal.StringLiteral; import org.apache.asterix.lang.common.statement.DeleteStatement; +import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.lang.common.struct.VarIdentifier; +import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.lang.common.visitor.base.ILangVisitor; import org.apache.asterix.lang.sqlpp.expression.SelectExpression; +import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory; import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; @@ -85,6 +89,7 @@ public class CreateProcedureStatement implements IExtensionStatement { private final List<VariableExpr> varList; private final CallExpr period; private String duration = ""; + private List<List<List<String>>> dependencies; public CreateProcedureStatement(FunctionSignature signature, List<VarIdentifier> parameterList, List<Integer> paramIds, String functionBody, Statement procedureBodyStatement, Expression period) { @@ -98,6 +103,9 @@ public class CreateProcedureStatement implements IExtensionStatement { this.varList.add(new VariableExpr(new VarIdentifier(parameterList.get(i).toString(), paramIds.get(i)))); } this.period = (CallExpr) period; + this.dependencies = new ArrayList<>(); + this.dependencies.add(new ArrayList<>()); + this.dependencies.add(new ArrayList<>()); } public String getProcedureBody() { @@ -195,6 +203,10 @@ public class CreateProcedureStatement implements IExtensionStatement { if (!varList.isEmpty()) { throw new CompilationException("Insert procedures cannot have parameters"); } + InsertStatement insertStatement = (InsertStatement) getProcedureBodyStatement(); + dependencies.get(0).add(Arrays.asList( + ((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()), + insertStatement.getDatasetName().getValue())); return new Pair<>( ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null), @@ -202,9 +214,14 @@ public class CreateProcedureStatement implements IExtensionStatement { } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) { Query s = (Query) getProcedureBodyStatement(); addLets((SelectExpression) s.getBody()); + SqlppRewriterFactory fact = new SqlppRewriterFactory(); + dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), + ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(1)); Pair<JobSpecification, PrecompiledType> pair = new Pair<>( compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) getProcedureBodyStatement()), PrecompiledType.QUERY); + dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), + ((Query) getProcedureBodyStatement()).getBody(), metadataProvider).get(0)); metadataProvider.getLocks().unlock(); return pair; } else if (getProcedureBodyStatement().getKind() == Statement.Kind.DELETE) { @@ -212,8 +229,15 @@ public class CreateProcedureStatement implements IExtensionStatement { getProcedureBodyStatement().accept(visitor, null); DeleteStatement delete = (DeleteStatement) getProcedureBodyStatement(); addLets((SelectExpression) delete.getQuery().getBody()); - return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, + + SqlppRewriterFactory fact = new SqlppRewriterFactory(); + dependencies = FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), delete.getQuery().getBody(), + metadataProvider); + + Pair<JobSpecification, PrecompiledType> pair = + new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE); + return pair; } else { throw new CompilationException("Procedure can only execute a single delete, insert, or query"); } @@ -256,8 +280,6 @@ public class CreateProcedureStatement implements IExtensionStatement { if (alreadyActive) { throw new AsterixException("Procedure " + signature.getName() + " is already running"); } - procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(), - Function.RETURNTYPE_VOID, getProcedureBody(), Function.LANGUAGE_AQL, duration); MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), metadataProvider.getDefaultDataverse()); tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); @@ -279,16 +301,16 @@ public class CreateProcedureStatement implements IExtensionStatement { // Now we subscribe if (listener == null) { - //TODO: Add datasets used by channel function - listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second, - new ArrayList<>(), - null, "BadListener"); + listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second, null, + "BadListener"); activeEventHandler.registerListener(listener); } - setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), - hdc, + setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc, stats); + procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(), + Function.RETURNTYPE_VOID, getProcedureBody(), Function.LANGUAGE_AQL, duration, dependencies); + MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllChannelsSearchKey.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllChannelsSearchKey.java new file mode 100644 index 0000000..62f16c7 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllChannelsSearchKey.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class AllChannelsSearchKey implements IExtensionMetadataSearchKey { + private static final long serialVersionUID = 1L; + + public AllChannelsSearchKey() { + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID; + } + + @Override + public ITupleReference getSearchKey() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllProceduresSearchKey.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllProceduresSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllProceduresSearchKey.java new file mode 100644 index 0000000..6b995fb --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/AllProceduresSearchKey.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class AllProceduresSearchKey implements IExtensionMetadataSearchKey { + private static final long serialVersionUID = 1L; + + public AllProceduresSearchKey() { + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID; + } + + @Override + public ITupleReference getSearchKey() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java index 0430118..526e091 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java @@ -49,15 +49,19 @@ public class BADMetadataRecordTypes { public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3; public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4; public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5; + public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6; public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType( // RecordTypeName BADConstants.RECORD_TYPENAME_CHANNEL, // FieldNames new String[] { BADConstants.DataverseName, BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName, - BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration }, + BADConstants.ResultsDatasetName, BADConstants.Function, BADConstants.Duration, + BADConstants.FIELD_NAME_DEPENDENCIES }, // FieldTypes new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, - BuiltinType.ASTRING, BuiltinType.ASTRING }, + new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, + new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null), + null) }, //IsOpen? true); //------------------------------------------ Broker ----------------------------------------// @@ -84,17 +88,21 @@ public class BADMetadataRecordTypes { public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5; public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6; public static final int PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX = 7; + public static final int PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX = 8; public static final ARecordType PROCEDURE_RECORDTYPE = MetadataRecordTypes.createRecordType( // RecordTypeName BADConstants.RECORD_TYPENAME_PROCEDURE, // FieldNames new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY, BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE, - BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration }, + BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration, + BADConstants.FIELD_NAME_DEPENDENCIES }, // FieldTypes new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING, - BuiltinType.ASTRING, BuiltinType.ASTRING }, + BuiltinType.ASTRING, BuiltinType.ASTRING, + new AOrderedListType(new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null), + null) }, //IsOpen? true); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java index b201af6..5f7dad0 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java @@ -15,6 +15,10 @@ package org.apache.asterix.bad.metadata; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.asterix.active.EntityId; import org.apache.asterix.bad.BADConstants; import org.apache.asterix.common.functions.FunctionSignature; @@ -34,20 +38,50 @@ public class Channel implements IExtensionMetadataEntity { private final String resultsDatasetName; private final String duration; private final FunctionSignature function; + private final List<String> functionAsPath; + /* + Dependencies are stored as an array of size two: + element 0 is a list of dataset dependencies + -stored as lists of [DataverseName, Dataset] for the datasets + element 1 is a list of function dependencies + -stored as lists of [DataverseName, FunctionName, Arity] for the functions + */ + private final List<List<List<String>>> dependencies; public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset, - FunctionSignature function, String duration) { + FunctionSignature function, String duration, List<List<List<String>>> dependencies) { this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName); this.function = function; this.duration = duration; this.resultsDatasetName = resultsDataset; this.subscriptionsDatasetName = subscriptionsDataset; + if (this.function.getNamespace() == null) { + this.function.setNamespace(dataverseName); + } + functionAsPath = Arrays.asList(this.function.getNamespace(), this.function.getName(), + Integer.toString(this.function.getArity())); + if (dependencies == null) { + this.dependencies = new ArrayList<>(); + this.dependencies.add(new ArrayList<>()); + this.dependencies.add(new ArrayList<>()); + List<String> resultsList = Arrays.asList(dataverseName, resultsDatasetName); + List<String> subscriptionList = Arrays.asList(dataverseName, subscriptionsDatasetName); + this.dependencies.get(0).add(resultsList); + this.dependencies.get(0).add(subscriptionList); + this.dependencies.get(1).add(functionAsPath); + } else { + this.dependencies = dependencies; + } } public EntityId getChannelId() { return channelId; } + public List<List<List<String>>> getDependencies() { + return dependencies; + } + public String getSubscriptionsDataset() { return subscriptionsDatasetName; } @@ -60,6 +94,10 @@ public class Channel implements IExtensionMetadataEntity { return duration; } + public List<String> getFunctionAsPath() { + return functionAsPath; + } + public FunctionSignature getFunction() { return function; } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java index d577260..14db134 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java @@ -18,15 +18,23 @@ package org.apache.asterix.bad.metadata; import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.asterix.builders.OrderedListBuilder; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator; +import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.base.IACursor; +import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.BuiltinType; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; /** @@ -42,11 +50,16 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> { // Payload field containing serialized feed. public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2; - @SuppressWarnings("unchecked") private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE); - @SuppressWarnings("unchecked") + private transient OrderedListBuilder dependenciesListBuilder = new OrderedListBuilder(); + private transient OrderedListBuilder dependencyListBuilder = new OrderedListBuilder(); + private transient OrderedListBuilder dependencyNameListBuilder = new OrderedListBuilder(); + private transient AOrderedListType stringList = new AOrderedListType(BuiltinType.ASTRING, null); + private transient AOrderedListType ListofLists = + new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null); + public ChannelTupleTranslator(boolean getTuple) { super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX); } @@ -74,30 +87,47 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> { String resultsName = ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX)) .getStringValue(); - String fName = - ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)) - .getStringValue(); + + IACursor cursor = ((AOrderedList) channelRecord + .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getCursor(); + List<String> functionSignature = new ArrayList<>(); + while (cursor.next()) { + functionSignature.add(((AString) cursor.get()).getStringValue()); + } + String duration = ((AString) channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX)) .getStringValue(); - FunctionSignature signature = null; - - String[] qnameComponents = fName.split("\\."); - String functionDataverse; - String functionName; - if (qnameComponents.length == 2) { - functionDataverse = qnameComponents[0]; - functionName = qnameComponents[1]; - } else { - functionDataverse = dataverseName; - functionName = qnameComponents[0]; + IACursor dependenciesCursor = ((AOrderedList) channelRecord + .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX)).getCursor(); + List<List<List<String>>> dependencies = new ArrayList<>(); + AOrderedList dependencyList; + AOrderedList qualifiedList; + int i = 0; + while (dependenciesCursor.next()) { + dependencies.add(new ArrayList<>()); + dependencyList = (AOrderedList) dependenciesCursor.get(); + IACursor qualifiedDependencyCursor = dependencyList.getCursor(); + int j = 0; + while (qualifiedDependencyCursor.next()) { + qualifiedList = (AOrderedList) qualifiedDependencyCursor.get(); + IACursor qualifiedNameCursor = qualifiedList.getCursor(); + dependencies.get(i).add(new ArrayList<>()); + while (qualifiedNameCursor.next()) { + dependencies.get(i).get(j).add(((AString) qualifiedNameCursor.get()).getStringValue()); + } + j++; + } + i++; + } - String[] nameComponents = functionName.split("@"); - signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1])); + FunctionSignature signature = new FunctionSignature(functionSignature.get(0), functionSignature.get(1), + Integer.parseInt(functionSignature.get(2))); - channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration); + channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration, + dependencies); return channel; } @@ -141,9 +171,17 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> { recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX, fieldValue); // write field 4 + OrderedListBuilder listBuilder = new OrderedListBuilder(); + ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage(); + listBuilder.reset(stringList); + for (String pathPart : channel.getFunctionAsPath()) { + itemValue.reset(); + aString.setValue(pathPart); + stringSerde.serialize(aString, itemValue.getDataOutput()); + listBuilder.addItem(itemValue); + } fieldValue.reset(); - aString.setValue(channel.getFunction().toString()); - stringSerde.serialize(aString, fieldValue.getDataOutput()); + listBuilder.write(fieldValue.getDataOutput(), true); recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX, fieldValue); // write field 5 @@ -152,6 +190,33 @@ public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> { stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX, fieldValue); + // write field 6 + dependenciesListBuilder.reset((AOrderedListType) BADMetadataRecordTypes.CHANNEL_RECORDTYPE + .getFieldTypes()[BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX]); + List<List<List<String>>> dependenciesList = channel.getDependencies(); + for (List<List<String>> dependencies : dependenciesList) { + dependencyListBuilder.reset(ListofLists); + for (List<String> dependency : dependencies) { + dependencyNameListBuilder.reset(stringList); + for (String subName : dependency) { + itemValue.reset(); + aString.setValue(subName); + stringSerde.serialize(aString, itemValue.getDataOutput()); + dependencyNameListBuilder.addItem(itemValue); + } + itemValue.reset(); + dependencyNameListBuilder.write(itemValue.getDataOutput(), true); + dependencyListBuilder.addItem(itemValue); + + } + itemValue.reset(); + dependencyListBuilder.write(itemValue.getDataOutput(), true); + dependenciesListBuilder.addItem(itemValue); + } + fieldValue.reset(); + dependenciesListBuilder.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX, fieldValue); + // write record recordBuilder.write(tupleBuilder.getDataOutput(), true); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java index 950612c..13f9e0d 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java @@ -73,7 +73,6 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>(); protected final ICcApplicationContext appCtx; protected final EntityId entityId; - protected final List<IDataset> datasets; protected final ActiveEvent statsUpdatedEvent; protected long statsTimestamp; protected String stats; @@ -83,10 +82,9 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener protected int numRegistered; public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type, - List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { + AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { this.appCtx = appCtx; this.entityId = entityId; - this.datasets = datasets; this.state = ActivityState.STOPPED; this.statsTimestamp = -1; this.statsRequestState = RequestState.INIT; @@ -133,7 +131,7 @@ public class DeployedJobSpecEventListener implements IActiveEntityEventsListener @Override public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); + return false; } public JobId getJobId() { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java index e3ed7fc..5712539 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.bad.metadata; +import java.util.ArrayList; import java.util.List; import org.apache.asterix.active.EntityId; @@ -39,9 +40,17 @@ public class Procedure implements IExtensionMetadataEntity { private final String returnType; private final String language; private final String duration; + /* + Dependencies are stored as an array of size two: + element 0 is a list of dataset dependencies + -stored as lists of [DataverseName, Dataset] for the datasets + element 1 is a list of function dependencies + -stored as lists of [DataverseName, FunctionName, Arity] for the functions + */ + private final List<List<List<String>>> dependencies; public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType, - String functionBody, String language, String duration) { + String functionBody, String language, String duration, List<List<List<String>>> dependencies) { this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName); this.params = params; this.body = functionBody; @@ -49,6 +58,13 @@ public class Procedure implements IExtensionMetadataEntity { this.language = language; this.arity = arity; this.duration = duration; + if (dependencies == null) { + this.dependencies = new ArrayList<>(); + this.dependencies.add(new ArrayList<>()); + this.dependencies.add(new ArrayList<>()); + } else { + this.dependencies = dependencies; + } } public EntityId getEntityId() { @@ -79,6 +95,10 @@ public class Procedure implements IExtensionMetadataEntity { return duration; } + public List<List<List<String>>> getDependencies() { + return dependencies; + } + @Override public boolean equals(Object other) { if (this == other) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java index 1aa633f..0a6acb9 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java @@ -34,6 +34,7 @@ import org.apache.asterix.om.base.ARecord; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.IACursor; import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.BuiltinType; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -54,10 +55,16 @@ public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> // Payload field containing serialized Procedure. public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3; - @SuppressWarnings("unchecked") private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE); + private transient OrderedListBuilder dependenciesListBuilder = new OrderedListBuilder(); + private transient OrderedListBuilder dependencyListBuilder = new OrderedListBuilder(); + private transient OrderedListBuilder dependencyNameListBuilder = new OrderedListBuilder(); + private transient AOrderedListType stringList = new AOrderedListType(BuiltinType.ASTRING, null); + private transient AOrderedListType ListofLists = + new AOrderedListType(new AOrderedListType(BuiltinType.ASTRING, null), null); + protected ProcedureTupleTranslator(boolean getTuple) { super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX); } @@ -104,8 +111,32 @@ public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX)) .getStringValue(); + IACursor dependenciesCursor = ((AOrderedList) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX)).getCursor(); + List<List<List<String>>> dependencies = new ArrayList<>(); + AOrderedList dependencyList; + AOrderedList qualifiedList; + int i = 0; + while (dependenciesCursor.next()) { + dependencies.add(new ArrayList<>()); + dependencyList = (AOrderedList) dependenciesCursor.get(); + IACursor qualifiedDependencyCursor = dependencyList.getCursor(); + int j = 0; + while (qualifiedDependencyCursor.next()) { + qualifiedList = (AOrderedList) qualifiedDependencyCursor.get(); + IACursor qualifiedNameCursor = qualifiedList.getCursor(); + dependencies.get(i).add(new ArrayList<>()); + while (qualifiedNameCursor.next()) { + dependencies.get(i).get(j).add(((AString) qualifiedNameCursor.get()).getStringValue()); + } + j++; + } + i++; + + } + return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition, - language, duration); + language, duration, dependencies); } @@ -185,6 +216,33 @@ public class ProcedureTupleTranslator extends AbstractTupleTranslator<Procedure> stringSerde.serialize(aString, fieldValue.getDataOutput()); recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX, fieldValue); + // write field 8 + dependenciesListBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE + .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX]); + List<List<List<String>>> dependenciesList = procedure.getDependencies(); + for (List<List<String>> dependencies : dependenciesList) { + dependencyListBuilder.reset(ListofLists); + for (List<String> dependency : dependencies) { + dependencyNameListBuilder.reset(stringList); + for (String subName : dependency) { + itemValue.reset(); + aString.setValue(subName); + stringSerde.serialize(aString, itemValue.getDataOutput()); + dependencyNameListBuilder.addItem(itemValue); + } + itemValue.reset(); + dependencyNameListBuilder.write(itemValue.getDataOutput(), true); + dependencyListBuilder.addItem(itemValue); + + } + itemValue.reset(); + dependencyListBuilder.write(itemValue.getDataOutput(), true); + dependenciesListBuilder.addItem(itemValue); + } + fieldValue.reset(); + dependenciesListBuilder.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DEPENDENCIES_FIELD_INDEX, fieldValue); + // write record recordBuilder.write(tupleBuilder.getDataOutput(), true); tupleBuilder.addFieldEndOffset(); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/main/resources/lang-extension/lang.txt ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt index 7c5931c..02aba78 100644 --- a/asterix-bad/src/main/resources/lang-extension/lang.txt +++ b/asterix-bad/src/main/resources/lang-extension/lang.txt @@ -129,9 +129,14 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException: Token endPos; Statement functionBodyExpr; Expression period = null; + String currentDataverse = defaultDataverse; + createNewScope(); } { "procedure" fctName = FunctionName() + { + defaultDataverse = fctName.dataverse; + } paramList = ParameterList() <LEFTBRACE> { @@ -149,6 +154,7 @@ CreateProcedureStatement ProcedureSpecification() throws ParseException: functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn); signature = new FunctionSignature(fctName.dataverse, fctName.function, paramList.size()); removeCurrentScope(); + defaultDataverse = currentDataverse; } ("period" period = FunctionCallExpr())? { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function/drop_function.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function/drop_function.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function/drop_function.1.ddl.sqlpp new file mode 100644 index 0000000..a5d3775 --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function/drop_function.1.ddl.sqlpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Drop Channel Function +* Expected Res : Error +* Date : Jan 2018 +* Author : Steven Jacobs +*/ + +drop dataverse channels if exists; +create dataverse channels; +use channels; + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + sender_location: point, + send_time: datetime, + referred_topics: {{ string }}, + message_text: string, + countA: int32, + countB: int32 +}; + + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; + +create function NearbyTweetsContainingText(place, text) { + (select m.message_text + from TweetMessageuuids m + where contains(m.message_text,text) + and spatial_intersect(m.sender_location, place)) +}; + +create dataverse two; +use two; + +create repetitive channel nearbyTweetChannel using channels.NearbyTweetsContainingText@2 period duration("PT10M"); + +use channels; +drop function NearbyTweetsContainingText@2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function_dataverse/drop_function_dataverse.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function_dataverse/drop_function_dataverse.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function_dataverse/drop_function_dataverse.1.ddl.sqlpp new file mode 100644 index 0000000..d1047b0 --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_function_dataverse/drop_function_dataverse.1.ddl.sqlpp @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Drop Channel Function Dataverse +* Expected Res : Error +* Date : Jan 2018 +* Author : Steven Jacobs +*/ + +drop dataverse channels if exists; +create dataverse channels; +use channels; + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + sender_location: point, + send_time: datetime, + referred_topics: {{ string }}, + message_text: string, + countA: int32, + countB: int32 +}; + + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; + +create function NearbyTweetsContainingText(place, text) { + (select m.message_text + from TweetMessageuuids m + where contains(m.message_text,text) + and spatial_intersect(m.sender_location, place)) +}; + +create dataverse two; +use two; + +create repetitive channel nearbyTweetChannel using channels.NearbyTweetsContainingText@2 period duration("PT10M"); + +drop dataverse channels; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/channel/drop_results/drop_results.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_results/drop_results.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_results/drop_results.1.ddl.sqlpp new file mode 100644 index 0000000..432f3c5 --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_results/drop_results.1.ddl.sqlpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Drop Channel Results +* Expected Res : Error +* Date : Jan 2018 +* Author : Steven Jacobs +*/ + +drop dataverse two if exists; +drop dataverse channels if exists; +create dataverse channels; +use channels; + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + sender_location: point, + send_time: datetime, + referred_topics: {{ string }}, + message_text: string, + countA: int32, + countB: int32 +}; + + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; + +create function NearbyTweetsContainingText(place, text) { + (select m.message_text + from TweetMessageuuids m + where contains(m.message_text,text) + and spatial_intersect(m.sender_location, place)) +}; + +create dataverse two; +use two; + +create repetitive channel nearbyTweetChannel using channels.NearbyTweetsContainingText@2 period duration("PT10M"); + +drop dataset two.nearbyTweetChannelResults; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp new file mode 100644 index 0000000..f6dc2bf --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_subscriptions/drop_subscriptions.1.ddl.sqlpp @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Drop Channel Subscriptions +* Expected Res : Error +* Date : Jan 2018 +* Author : Steven Jacobs +*/ + +drop dataverse two if exists; +drop dataverse channels if exists; +create dataverse channels; +use channels; + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + sender_location: point, + send_time: datetime, + referred_topics: {{ string }}, + message_text: string, + countA: int32, + countB: int32 +}; + + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; + +create function NearbyTweetsContainingText(place, text) { + (select m.message_text + from TweetMessageuuids m + where contains(m.message_text,text) + and spatial_intersect(m.sender_location, place)) +}; + +create dataverse two; +use two; + +create repetitive channel nearbyTweetChannel using channels.NearbyTweetsContainingText@2 period duration("PT10M"); + +drop dataset two.nearbyTweetChannelSubscriptions; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.sqlpp index fe1db99..638f7b5 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/channel/room_occupants/room_occupants.1.ddl.sqlpp @@ -23,6 +23,7 @@ * Author : Steven Jacobs */ +drop dataverse two if exists; drop dataverse channels if exists; create dataverse channels; use channels; http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.ddl.sqlpp new file mode 100644 index 0000000..71b2ed4 --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.ddl.sqlpp @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* +* Description : Create Procedure Check Metadata +* Expected Res : Success +* Date : Jan 2017 +* Author : Steven Jacobs +*/ + +drop dataverse two if exists; +drop dataverse channels if exists; +create dataverse channels; +use channels; +create type myLocation as { + id: int +}; +create dataset UserLocations(myLocation) +primary key id; + +create function really_contains(word,letter){ +contains(word,letter) +}; + +create dataverse two; +use two; + +create function really_contains(word,letter){ +contains(word,letter) +}; + +create type myLocation as { + id: int +}; +create dataset UserLocations(myLocation) +primary key id; + +create procedure selectSome(r, otherRoom) { +select roomNumber from channels.UserLocations +where roomNumber = r +or roomNumber = otherRoom +and channels.really_contains(roomNumber,"l") +order by id +}; + +create procedure deleteSome(r, otherRoom) { +delete from channels.UserLocations +where roomNumber = r +or roomNumber = otherRoom +and channels.really_contains(roomNumber,"l") +}; + +create procedure addMe() { + insert into channels.UserLocations([ + {"timeStamp":current_datetime(), "roomNumber":222}] + ) +}; + +create procedure localSelectSome(r, otherRoom) { +select roomNumber from UserLocations +where roomNumber = r +or roomNumber = otherRoom +and really_contains(roomNumber,"l") +order by id +}; + +create procedure localDeleteSome(r, otherRoom) { +delete from UserLocations +where roomNumber = r +or roomNumber = otherRoom +and really_contains(roomNumber,"l") +}; + +create procedure localAddMe() { + insert into UserLocations([ + {"timeStamp":current_datetime(), "roomNumber":222}] + ) +}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.2.query.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.2.query.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.2.query.sqlpp new file mode 100644 index 0000000..023c343 --- /dev/null +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/create_procedure_check_metadata/create_procedure_check_metadata.2.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +select value x +from Metadata.`Procedure` x +order by x.ProcedureName; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/5b870653/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.sqlpp b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.sqlpp index 7dbf136..905211f 100644 --- a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.sqlpp +++ b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.sqlpp @@ -23,6 +23,7 @@ * Author : Steven Jacobs */ +drop dataverse two if exists; drop dataverse channels if exists; create dataverse channels; use channels;
