Repository: asterixdb-bad Updated Branches: refs/heads/master cb77cb275 -> 4345dffae
[BAD][RT] EventListener change to accommondate the new interfaces Adapt the PrecompiledJobEventListener to follow the new interfaces. Change-Id: I121acc01f2bb56ce2bf43f6358da9158d7c7e7f7 Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/4345dffa Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/4345dffa Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/4345dffa Branch: refs/heads/master Commit: 4345dffae4ca1a78e1ce9e2ce07f2973da046838 Parents: cb77cb2 Author: Xikui Wang <[email protected]> Authored: Wed Jun 28 20:53:12 2017 -0700 Committer: Xikui Wang <[email protected]> Committed: Wed Jun 28 19:54:11 2017 -0800 ---------------------------------------------------------------------- .../lang/statement/CreateChannelStatement.java | 50 ++++++++++---------- .../statement/CreateProcedureStatement.java | 30 +++++------- .../metadata/PrecompiledJobEventListener.java | 17 +++++-- 3 files changed, 49 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/4345dffa/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 09cc3e5..7d91d2e 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -21,10 +21,7 @@ package org.apache.asterix.bad.lang.statement; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.StringReader; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -70,6 +67,7 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; @@ -133,13 +131,11 @@ public class CreateChannelStatement implements IExtensionStatement { return channelResultsInsertQuery; } - @Override - public byte getCategory() { + @Override public byte getCategory() { return Category.DDL; } - @Override - public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { return null; } @@ -164,8 +160,7 @@ public class CreateChannelStatement implements IExtensionStatement { } - @Override - public byte getKind() { + @Override public byte getKind() { return Kind.EXTENSION; } @@ -193,13 +188,14 @@ public class CreateChannelStatement implements IExtensionStatement { fieldNames.add(BADConstants.ResultId); partitionFields.add(fieldNames); idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false); - DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName, - new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null, - new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true); + DatasetDecl createResultsDataset = + new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME), + resultsTypeName, null, null, null, null, new HashMap<String, String>(), + new HashMap<String, String>(), DatasetType.INTERNAL, idd, true); //Run both statements to create datasets - ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, - hcc); + ((QueryTranslator) statementExecutor) + .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc); metadataProvider.getLocks().reset(); ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc); @@ -235,8 +231,9 @@ public class CreateChannelStatement implements IExtensionStatement { SetStatement ss = (SetStatement) fStatements.get(0); metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue()); - return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1), - hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null); + return ((QueryTranslator) statementExecutor) + .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null, + stats, true, null, null); } private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc, @@ -247,15 +244,15 @@ public class CreateChannelStatement implements IExtensionStatement { if (predistributed) { jobId = hcc.distributeJob(channeljobSpec); } - ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), - jobId, hcc, ChannelJobService.findPeriod(duration)); + ScheduledExecutorService ses = ChannelJobService + .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc, + ChannelJobService.findPeriod(duration)); listener.storeDistributedInfo(jobId, ses, null); } } - @Override - public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { @@ -310,12 +307,12 @@ public class CreateChannelStatement implements IExtensionStatement { metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider()); tempMdProvider.setConfig(metadataProvider.getConfig()); //Create Channel Datasets - createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, - dataverse); + createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse); tempMdProvider.getLocks().reset(); //Create Channel Internal Job - JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName, - tempMdProvider, hcc, hdc, stats, dataverse); + JobSpecification channeljobSpec = + createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats, + dataverse); // Now we subscribe if (listener == null) { @@ -323,7 +320,8 @@ public class CreateChannelStatement implements IExtensionStatement { datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue())); datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue())); //TODO: Add datasets used by channel function - listener = new PrecompiledJobEventListener(entityId, PrecompiledType.CHANNEL, datasets); + listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null, + "BadListener"); activeEventHandler.registerListener(listener); } http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/4345dffa/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index dfc3ed3..d203905 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -96,8 +96,7 @@ public class CreateProcedureStatement implements IExtensionStatement { this.period = (CallExpr) period; } - @Override - public byte getKind() { + @Override public byte getKind() { return Kind.EXTENSION; } @@ -109,8 +108,7 @@ public class CreateProcedureStatement implements IExtensionStatement { return signature; } - @Override - public byte getCategory() { + @Override public byte getCategory() { return Category.DDL; } @@ -118,8 +116,7 @@ public class CreateProcedureStatement implements IExtensionStatement { return period; } - @Override - public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { return null; } @@ -170,10 +167,9 @@ public class CreateProcedureStatement implements IExtensionStatement { throw new CompilationException("Procedure can only execute a single statement"); } if (fStatements.get(0).getKind() == Statement.Kind.INSERT) { - return new Pair<>( - ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, - fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null), - PrecompiledType.INSERT); + return new Pair<>(((QueryTranslator) statementExecutor) + .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, + null, stats, true, null, null), PrecompiledType.INSERT); } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) { Pair<JobSpecification, PrecompiledType> pair = new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)), @@ -183,8 +179,8 @@ public class CreateProcedureStatement implements IExtensionStatement { } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) { SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(); fStatements.get(0).accept(visitor, null); - return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, - fStatements.get(0), hcc, true), PrecompiledType.DELETE); + return new Pair<>(((QueryTranslator) statementExecutor) + .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true), PrecompiledType.DELETE); } else { throw new CompilationException("Procedure can only execute a single delete, insert, or query"); } @@ -197,8 +193,7 @@ public class CreateProcedureStatement implements IExtensionStatement { listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId)); } - @Override - public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, + @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, int resultSetIdCounter) throws HyracksDataException, AlgebricksException { ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); @@ -217,8 +212,8 @@ public class CreateProcedureStatement implements IExtensionStatement { try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(), - Integer.toString(signature.getArity())); + procedure = BADLangExtension + .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity())); if (procedure != null) { throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists."); } @@ -252,7 +247,8 @@ public class CreateProcedureStatement implements IExtensionStatement { // Now we subscribe if (listener == null) { //TODO: Add datasets used by channel function - listener = new PrecompiledJobEventListener(entityId, procedureJobSpec.second, new ArrayList<>()); + listener = new PrecompiledJobEventListener(appCtx, entityId, procedureJobSpec.second, new ArrayList<>(), + null, "BadListener"); activeEventHandler.registerListener(listener); } setupDistributedJob(entityId, procedureJobSpec.first, hcc, listener, tempMdProvider.getResultSetId(), hdc, http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/4345dffa/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java index 55547ea..5eb18d1 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java @@ -26,8 +26,10 @@ import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.log4j.Logger; @@ -47,9 +49,9 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { private final PrecompiledType type; - public PrecompiledJobEventListener(EntityId entityId, PrecompiledType type, List<IDataset> datasets) { - this.entityId = entityId; - this.datasets = datasets; + public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type, + List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { + super(appCtx, entityId, datasets, locations, runtimeName); state = ActivityState.STOPPED; this.type = type; } @@ -99,6 +101,11 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { } } + @Override + public void refreshStats(long l) throws HyracksDataException { + // no op + } + private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception { if (LOGGER.isInfoEnabled()) { LOGGER.info("Channel Job started for " + entityId); @@ -113,7 +120,7 @@ public class PrecompiledJobEventListener extends ActiveEntityEventsListener { } @Override - public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException { - return null; + public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException { + // no op } }
