http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index bd5c024..7725936 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -34,53 +34,50 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; -import org.apache.asterix.active.IActiveEventSubscriber; +import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.algebra.extension.IExtensionStatement; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.api.http.server.AbstractQueryApiServlet; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.ResultUtil; +import org.apache.asterix.app.active.ActiveEntityEventsListener; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.app.active.FeedEventsListener; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.config.ExternalProperties; -import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.DatasetConfig.TransactionState; -import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.config.ExternalProperties; +import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.lang.common.base.IReturningStatement; import org.apache.asterix.lang.common.base.IRewriterFactory; @@ -123,7 +120,6 @@ import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.WriteStatement; import org.apache.asterix.lang.common.struct.Identifier; import org.apache.asterix.metadata.IDatasetDetails; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; @@ -145,12 +141,12 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; -import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.ExternalIndexingOperations; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; import org.apache.asterix.metadata.utils.MetadataConstants; +import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; @@ -159,16 +155,16 @@ import org.apache.asterix.om.types.TypeSignature; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; -import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.IStatementExecutorContext; -import org.apache.asterix.translator.SessionConfig; -import org.apache.asterix.translator.SessionOutput; -import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; +import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.util.ValidateUtil; import org.apache.asterix.utils.DataverseUtil; import org.apache.asterix.utils.FeedOperations; @@ -178,7 +174,6 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Triple; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; @@ -217,18 +212,17 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected final List<FunctionDecl> declaredFunctions; protected final APIFramework apiFramework; protected final IRewriterFactory rewriterFactory; - protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class); + protected final IMetadataLockManager lockManager; public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, - ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, - ExecutorService executorService) { + ILangCompilationProvider compliationProvider, ExecutorService executorService) { this.appCtx = appCtx; + this.lockManager = appCtx.getMetadataLockManager(); this.statements = statements; this.sessionOutput = output; this.sessionConfig = output.config(); - this.componentProvider = componentProvider; declaredFunctions = getDeclaredFunctions(statements); apiFramework = new APIFramework(compliationProvider); rewriterFactory = compliationProvider.getRewriterFactory(); @@ -281,7 +275,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } validateOperation(appCtx, activeDataverse, stmt); rewriteStatement(stmt); // Rewrite the statement's AST. - MetadataProvider metadataProvider = new MetadataProvider(appCtx, activeDataverse, componentProvider); + MetadataProvider metadataProvider = new MetadataProvider(appCtx, activeDataverse); metadataProvider.setWriterFactory(writerFactory); metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider); metadataProvider.setOutputFile(outputFile); @@ -431,7 +425,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String dvName = dvd.getDataverseName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName); + lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dvName); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); if (dv == null) { @@ -454,7 +448,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName); + lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dvName); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); if (dv != null) { @@ -529,7 +523,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, itemTypeDataverseName, itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName, metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy); @@ -693,11 +687,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset) throws CompilationException { StringBuilder builder = null; - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { - if (listener.isEntityUsingDataset(dataset)) { + if (listener.isEntityUsingDataset(dataset) && listener.isActive()) { if (builder == null) { builder = new StringBuilder(); } @@ -741,15 +735,15 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; String dataverseName = getActiveDataverse(stmtCreateIndex.getDataverseName()); String datasetName = stmtCreateIndex.getDatasetName().getValue(); + String indexName = stmtCreateIndex.getIndexName().getValue(); List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), dataverseName, - dataverseName + "." + datasetName); - String indexName = null; + String datasetFullyQualifiedName = dataverseName + "." + datasetName; Dataset ds = null; - // For external datasets Index index = null; + MetadataLockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, + datasetFullyQualifiedName); try { ds = metadataProvider.findDataset(dataverseName, datasetName); if (ds == null) { @@ -757,7 +751,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); } - indexName = stmtCreateIndex.getIndexName().getValue(); index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, indexName); if (index != null) { @@ -1111,7 +1104,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String typeName = stmtCreateType.getIdent().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + typeName); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); @@ -1157,7 +1150,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); List<JobSpecification> jobsToExecute = new ArrayList<>(); - MetadataLockManager.INSTANCE.acquireDataverseWriteLock(metadataProvider.getLocks(), dataverseName); + lockManager.acquireDataverseWriteLock(metadataProvider.getLocks(), dataverseName); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); if (dv == null) { @@ -1168,26 +1161,31 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException("There is no dataverse with this name " + dataverseName + "."); } } + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + bActiveTxn = false; // # disconnect all feeds from any datasets in the dataverse. - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners(); - Identifier dvId = new Identifier(dataverseName); - MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(), - metadataProvider.getStorageComponentProvider()); - tempMdProvider.setConfig(metadataProvider.getConfig()); for (IActiveEntityEventsListener listener : activeListeners) { EntityId activeEntityId = listener.getEntityId(); if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) && activeEntityId.getDataverse().equals(dataverseName)) { - tempMdProvider.getLocks().reset(); - stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())), - tempMdProvider); - // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider, - MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); + if (listener.getState() != ActivityState.STOPPED) { + ((ActiveEntityEventsListener) listener).stop(metadataProvider); + } + FeedEventsListener feedListener = (FeedEventsListener) listener; + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + doDropFeed(hcc, metadataProvider, feedListener.getFeed()); + MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext()); + bActiveTxn = false; } } + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); // #. prepare jobs which will drop corresponding datasets with indexes. List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName); @@ -1243,7 +1241,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Drops all node groups that no longer needed for (Dataset dataset : datasets) { String nodeGroup = dataset.getNodeGroupName(); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); + lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); if (MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup) != null) { MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodeGroup, true); } @@ -1294,26 +1292,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void stopFeedBeforeDelete(Pair<Identifier, Identifier> feedNameComp, MetadataProvider metadataProvider) { - StopFeedStatement disStmt = new StopFeedStatement(feedNameComp); - try { - handleStopFeedStatement(metadataProvider, disStmt); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Stopped feed " + feedNameComp.second.getValue()); - } - } catch (Exception exception) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to stop feed " + feedNameComp.second.getValue() + exception); - } - } - } - public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) throws Exception { DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt; String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); String datasetName = stmtDelete.getDatasetName().getValue(); - MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.dropDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc, true); @@ -1386,14 +1370,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; String datasetName = stmtIndexDrop.getDatasetName().getValue(); String dataverseName = getActiveDataverse(stmtIndexDrop.getDataverseName()); + String indexName = stmtIndexDrop.getIndexName().getValue(); ProgressState progress = ProgressState.NO_PROGRESS; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); List<JobSpecification> jobsToExecute = new ArrayList<>(); - MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(), dataverseName, - dataverseName + "." + datasetName); - String indexName = null; + String dsFullyQualifiedName = dataverseName + "." + datasetName; + MetadataLockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, dsFullyQualifiedName); // For external index boolean dropFilesIndex = false; try { @@ -1402,8 +1386,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException( "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); } - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { @@ -1420,7 +1404,6 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } if (ds.getDatasetType() == DatasetType.INTERNAL) { - indexName = stmtIndexDrop.getIndexName().getValue(); Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); if (index == null) { if (stmtIndexDrop.getIfExists()) { @@ -1581,7 +1564,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + typeName); try { Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); @@ -1606,7 +1589,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String nodegroupName = stmtDelete.getNodeGroupName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodegroupName); + lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodegroupName); try { NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); if (ng == null) { @@ -1634,7 +1617,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), dataverse, + MetadataLockUtil.functionStatementBegin(lockManager, metadataProvider.getLocks(), dataverse, dataverse + "." + functionName); try { Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); @@ -1662,7 +1645,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen signature.setNamespace(getActiveDataverseName(signature.getNamespace())); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), signature.getNamespace(), + MetadataLockUtil.functionStatementBegin(lockManager, metadataProvider.getLocks(), signature.getNamespace(), signature.getNamespace() + "." + signature.getName()); try { Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); @@ -1692,7 +1675,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { CompiledLoadFromFileStatement cls = @@ -1723,7 +1706,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen final IMetadataLocker locker = new IMetadataLocker() { @Override public void lock() throws AsterixException { - MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(), + MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName + "." + stmtInsertUpsert.getDatasetName()); } @@ -1783,7 +1766,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(), + MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName + "." + stmtDelete.getDatasetName()); try { metadataProvider.setWriteTransaction(true); @@ -1860,7 +1843,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String feedName = cfs.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + feedName); Feed feed = null; try { @@ -1895,7 +1878,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt; dataverse = getActiveDataverse(null); policy = cfps.getPolicyName(); - MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(), dataverse, + MetadataLockUtil.createFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverse, dataverse + "." + policy); try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -1956,7 +1939,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String feedName = stmtFeedDrop.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + feedName); try { Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName); @@ -1967,27 +1950,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); return; } - - EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - ActiveEntityEventsListener listener = - (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); - if (listener != null) { - throw new AlgebricksException("Feed " + feedId - + " is currently active and connected to the following dataset(s) \n" + listener.toString()); - } else { - JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, - MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); - runJob(hcc, spec); - MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName); - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Removed feed " + feedId); - } + doDropFeed(hcc, metadataProvider, feed); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; @@ -1996,13 +1960,36 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } + protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Feed feed) + throws Exception { + MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); + EntityId feedId = feed.getFeedId(); + ActiveNotificationHandler activeNotificationHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveEntityEventsListener listener = + (ActiveEntityEventsListener) activeNotificationHandler.getListener(feedId); + if (listener != null && listener.getState() != ActivityState.STOPPED) { + throw new AlgebricksException("Feed " + feedId + + " is currently active and connected to the following dataset(s) \n" + listener.toString()); + } else if (listener != null) { + listener.unregister(); + } + JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, + MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName())); + runJob(hcc, spec); + MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), feed.getFeedName()); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Removed feed " + feedId); + } + } + protected void handleDropFeedPolicyStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt; String dataverseName = getActiveDataverse(stmtFeedPolicyDrop.getDataverseName()); String policyName = stmtFeedPolicyDrop.getPolicyName().getValue(); - MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + policyName); try { FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName); @@ -2028,56 +2015,45 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen StartFeedStatement sfs = (StartFeedStatement) stmt; String dataverseName = getActiveDataverse(sfs.getDataverseName()); String feedName = sfs.getFeedName().getValue(); - // Transcation handler MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - // Runtime handler - EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - // Feed & Feed Connections - Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, - metadataProvider.getMetadataTxnContext()); - List<FeedConnection> feedConnections = MetadataManager.INSTANCE - .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); - ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - IStorageComponentProvider storageComponentProvider = new StorageComponentProvider(); - DefaultStatementExecutorFactory qtFactory = new DefaultStatementExecutorFactory(); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler - .getActiveEntityListener(entityId); - if (listener != null) { - throw new AlgebricksException("Feed " + feedName + " is started already."); - } - // Start - MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), dataverseName, - dataverseName + "." + feedName, feedConnections); + boolean committed = false; + MetadataLockUtil.startFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, + dataverseName + "." + feedName); try { - // Prepare policy - List<IDataset> datasets = new ArrayList<>(); - for (FeedConnection connection : feedConnections) { - Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); - datasets.add(ds); - } - org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = - FeedOperations.buildStartFeedJob(sessionOutput, metadataProvider, feed, feedConnections, - compilationProvider, storageComponentProvider, qtFactory, hcc); - - JobSpecification feedJob = jobInfo.getLeft(); - listener = new ActiveEntityEventsListener(appCtx, entityId, datasets, jobInfo.getRight(), - FeedIntakeOperatorNodePushable.class.getSimpleName()); - activeEventHandler.registerListener(listener); - IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STARTED); - feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); - // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. - // We will need to design general exception handling mechanism for feeds. - JobUtils.runJob(hcc, feedJob, - Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION))); - eventSubscriber.sync(); - LOGGER.log(Level.INFO, "Submitted"); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + // Runtime handler + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + // Feed & Feed Connections + Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, + metadataProvider.getMetadataTxnContext()); + List<FeedConnection> feedConnections = MetadataManager.INSTANCE + .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName); + for (FeedConnection feedConnection : feedConnections) { + // what if the dataset is in a different dataverse + String fqName = feedConnection.getDataverseName() + "." + feedConnection.getDatasetName(); + lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), fqName); + } + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); + if (listener == null) { + // Prepare policy + List<Dataset> datasets = new ArrayList<>(); + for (FeedConnection connection : feedConnections) { + Dataset ds = + metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); + datasets.add(ds); + } + listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId, + datasets, null, FeedIntakeOperatorNodePushable.class.getSimpleName(), + NoRetryPolicyFactory.INSTANCE, feed, feedConnections); + } + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + committed = true; + listener.start(metadataProvider); } catch (Exception e) { - abort(e, e, mdTxnCtx); - if (listener != null) { - activeEventHandler.unregisterListener(listener); + if (!committed) { + abort(e, e, mdTxnCtx); } throw e; } finally { @@ -2089,32 +2065,18 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen StopFeedStatement sfst = (StopFeedStatement) stmt; String dataverseName = getActiveDataverse(sfst.getDataverseName()); String feedName = sfst.getFeedName().getValue(); - EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); // Obtain runtime info from ActiveListener - ActiveEntityEventsListener listener = - (ActiveEntityEventsListener) activeEventHandler.getActiveEntityListener(feedId); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); if (listener == null) { throw new AlgebricksException("Feed " + feedName + " is not started."); } - IActiveEventSubscriber eventSubscriber = new WaitForStateSubscriber(listener, ActivityState.STOPPED); - // Transaction - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), dataverseName, feedName); + MetadataLockUtil.stopFeedBegin(lockManager, metadataProvider.getLocks(), entityId.getDataverse(), + entityId.getEntityName()); try { - // validate - FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx); - // Construct ActiveMessage - for (int i = 0; i < listener.getLocations().getLocations().length; i++) { - String intakeLocation = listener.getLocations().getLocations()[i]; - FeedOperations.SendStopMessageToNode(appCtx, feedId, intakeLocation, i); - } - eventSubscriber.sync(); - } catch (Exception e) { - abort(e, e, mdTxnCtx); - throw e; + listener.stop(metadataProvider); } finally { metadataProvider.getLocks().unlock(); } @@ -2130,20 +2092,20 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); // Check whether feed is alive - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - if (activeEventHandler - .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)) != null) { - throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); - } + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); // Transaction handling - MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName, dataverseName + "." + feedName); try { // validation - FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName, mdTxnCtx); + Dataset dataset = FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName); Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, metadataProvider.getMetadataTxnContext()); + FeedEventsListener listener = (FeedEventsListener) activeEventHandler.getListener(feed.getFeedId()); + if (listener != null && listener.isActive()) { + throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); + } ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME); List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions(); @@ -2169,6 +2131,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.updateFunction(mdTxnCtx, func); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + if (listener != null) { + listener.add(dataset); + listener.addFeedConnection(fc); + } } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; @@ -2184,21 +2150,25 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String feedName = cfs.getFeedName().getValue(); MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler(); - // Check whether feed is alive - if (activeEventHandler - .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)) != null) { - throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); - } - MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName, dataverseName + "." + cfs.getFeedName()); try { - FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue(), - mdTxnCtx); + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + // Check whether feed is alive + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler + .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)); + if (listener != null && listener.isActive()) { + throw new CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, feedName); + } + FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue()); FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx); FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, feedName, datasetName); + Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); + if (ds == null) { + throw new CompilationException("Dataset " + dataverseName + "." + datasetName + " doesn't exist"); + } if (fc == null) { throw new CompilationException("Feed " + feedName + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); @@ -2210,6 +2180,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + if (listener != null) { + listener.remove(ds); + } } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; @@ -2227,7 +2200,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); List<JobSpecification> jobsToExecute = new ArrayList<>(); - MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.compactBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); @@ -2447,7 +2420,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), ngName); + lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), ngName); try { NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); if (ng != null) { @@ -2490,7 +2463,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen Dataset transactionDataset = null; boolean lockAquired = false; boolean success = false; - MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(), dataverseName, + MetadataLockUtil.refreshDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { ds = metadataProvider.findDataset(dataverseName, datasetName); @@ -2721,7 +2694,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo : dataverseNameTo + '.' + datasetNameTo; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(), fullyQualifiedDatasetNameTo); + MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), fullyQualifiedDatasetNameTo); try { prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, dataverseNameFrom, dataverseNameTo, datasetNameFrom, datasetNameTo, mdTxnCtx);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index cd9138a..dc92f92 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -30,15 +30,14 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.api.http.ctx.StatementExecutorContext; +import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; import org.apache.asterix.api.http.server.ConnectorApiServlet; import org.apache.asterix.api.http.server.DdlApiServlet; import org.apache.asterix.api.http.server.DiagnosticsApiServlet; -import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.FullApiServlet; import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet; import org.apache.asterix.api.http.server.QueryApiServlet; @@ -52,6 +51,7 @@ import org.apache.asterix.api.http.server.ShutdownApiServlet; import org.apache.asterix.api.http.server.UpdateApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.api.http.servlet.ServletConstants; +import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; @@ -65,6 +65,7 @@ import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.Servlets; @@ -130,11 +131,11 @@ public class CCApplication extends BaseCCApplication { .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, ccServiceCtx); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); - GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), componentProvider); + GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); statementExecutorCtx = new StatementExecutorContext(); appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager, - () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy, - new ActiveLifecycleListener(), componentProvider); + () -> MetadataManager.INSTANCE, globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), + componentProvider, new MetadataLockManager()); ClusterStateManager.INSTANCE.setCcAppCtx(appCtx); ccExtensionManager = new CCExtensionManager(getExtensions()); appCtx.setExtensionManager(ccExtensionManager); @@ -147,18 +148,22 @@ public class CCApplication extends BaseCCApplication { setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort())); ccServiceCtx.setDistributedState(proxy); MetadataManager.initialize(proxy, metadataProperties); - ccServiceCtx.addJobLifecycleListener(appCtx.getActiveLifecycleListener()); + ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler()); // create event loop groups webManager = new WebManager(); configureServers(); webManager.start(); - ClusterManagerProvider.getClusterManager().registerSubscriber(GlobalRecoveryManager.instance()); + ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager); ccServiceCtx.addClusterLifecycleListener(new ClusterLifecycleListener(appCtx)); jobCapacityController = new JobCapacityController(controllerService.getResourceManager()); } + protected GlobalRecoveryManager createGlobalRecoveryManager() throws Exception { + return new GlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider); + } + @Override protected void configureLoggingLevel(Level level) { super.configureLoggingLevel(level); @@ -178,7 +183,7 @@ public class CCApplication extends BaseCCApplication { @Override public void stop() throws Exception { - ((ActiveLifecycleListener) appCtx.getActiveLifecycleListener()).stop(); + ((ActiveNotificationHandler) appCtx.getActiveNotificationHandler()).stop(); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Stopping Asterix cluster controller"); } @@ -288,9 +293,8 @@ public class CCApplication extends BaseCCApplication { ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider); case Servlets.QUERY_AQL: - return new QueryServiceServlet(ctx, paths, appCtx, AQL, - ccExtensionManager.getCompilationProvider(AQL), getStatementExecutorFactory(), - componentProvider); + return new QueryServiceServlet(ctx, paths, appCtx, AQL, ccExtensionManager.getCompilationProvider(AQL), + getStatementExecutorFactory(), componentProvider); case Servlets.CONNECTOR: return new ConnectorApiServlet(ctx, paths, appCtx); case Servlets.REBALANCE: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 2a1fd0b..3209557 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -43,33 +43,29 @@ import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.utils.ExternalIndexingOperations; import org.apache.asterix.metadata.utils.MetadataConstants; -import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; public class GlobalRecoveryManager implements IGlobalRecoveryManager { private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName()); - private static GlobalRecoveryManager instance; - private static ClusterState state; - private final IStorageComponentProvider componentProvider; - private final ICCServiceContext ccServiceCtx; - private IHyracksClientConnection hcc; + protected final IStorageComponentProvider componentProvider; + protected final ICCServiceContext serviceCtx; + protected IHyracksClientConnection hcc; + protected volatile boolean recoveryCompleted; - private GlobalRecoveryManager(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, - IStorageComponentProvider componentProvider) { - setState(ClusterState.UNUSABLE); - this.ccServiceCtx = ccServiceCtx; + public GlobalRecoveryManager(ICCServiceContext serviceCtx, IHyracksClientConnection hcc, + IStorageComponentProvider componentProvider) { + this.serviceCtx = serviceCtx; this.hcc = hcc; this.componentProvider = componentProvider; } @Override public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) { - setState(ClusterStateManager.INSTANCE.getState()); - ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(false); return Collections.emptySet(); } @@ -85,54 +81,59 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { } @Override - public void startGlobalRecovery(ICcApplicationContext appCtx) { - // perform global recovery if state changed to active - final ClusterState newState = ClusterStateManager.INSTANCE.getState(); - boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE); - if (needToRecover) { - setState(newState); - ccServiceCtx.getControllerService().getExecutor().submit(() -> { - LOGGER.info("Starting Global Recovery"); - MetadataTransactionContext mdTxnCtx = null; + public void startGlobalRecovery(ICcApplicationContext appCtx) throws HyracksDataException { + if (!recoveryCompleted) { + recover(appCtx); + } + } + + protected void recover(ICcApplicationContext appCtx) throws HyracksDataException { + LOGGER.info("Starting Global Recovery"); + MetadataTransactionContext mdTxnCtx = null; + try { + MetadataManager.INSTANCE.init(); + // Loop over datasets + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) { + mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse); + } + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } catch (Exception e) { + // This needs to be fixed <-- Needs to shutdown the system --> + /* + * Note: Throwing this illegal state exception will terminate this thread + * and feeds listeners will not be notified. + */ + LOGGER.log(Level.SEVERE, "Global recovery was not completed successfully: ", e); + if (mdTxnCtx != null) { try { - MetadataManager.INSTANCE.init(); - // Loop over datasets - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) { - mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse); - } - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { - // This needs to be fixed <-- Needs to shutdown the system --> - /* - * Note: Throwing this illegal state exception will terminate this thread - * and feeds listeners will not be notified. - */ - LOGGER.log(Level.SEVERE, "Global recovery was not completed successfully: ", e); - if (mdTxnCtx != null) { - try { - MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); - } catch (Exception e1) { - LOGGER.log(Level.SEVERE, "Exception in aborting", e1); - e1.addSuppressed(e); - throw new IllegalStateException(e1); - } - } + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); + } catch (Exception e1) { + LOGGER.log(Level.SEVERE, "Exception in aborting", e1); + e1.addSuppressed(e); + throw new IllegalStateException(e); } - ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(true); - LOGGER.info("Global Recovery Completed"); - }); + } + throw HyracksDataException.create(e); + } + recoveryCompleted = true; + LOGGER.info("Global Recovery Completed"); + } + + @Override + public void notifyStateChange(ClusterState newState) { + if (newState != ClusterState.ACTIVE) { + recoveryCompleted = false; } } private MetadataTransactionContext recoverDataset(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx, - Dataverse dataverse) - throws Exception { + Dataverse dataverse) throws Exception { if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) { - MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse, componentProvider); + MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse); try { - List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, - dataverse.getDataverseName()); + List<Dataset> datasets = + MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName()); for (Dataset dataset : datasets) { if (dataset.getDatasetType() == DatasetType.EXTERNAL) { // External dataset @@ -144,8 +145,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { TransactionState datasetState = dsd.getState(); if (!indexes.isEmpty()) { if (datasetState == TransactionState.BEGIN) { - List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, - dataset); + List<ExternalFile> files = + MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); // if persumed abort, roll backward // 1. delete all pending files for (ExternalFile file : files) { @@ -156,8 +157,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { } // 2. clean artifacts in NCs metadataProvider.setMetadataTxnContext(mdTxnCtx); - JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(dataset, indexes, - metadataProvider); + JobSpecification jobSpec = + ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider); executeHyracksJob(jobSpec); // 3. correct the dataset state ((ExternalDatasetDetails) dataset.getDatasetDetails()).setState(TransactionState.COMMIT); @@ -165,13 +166,13 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); } else if (datasetState == TransactionState.READY_TO_COMMIT) { - List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, - dataset); + List<ExternalFile> files = + MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); // if ready to commit, roll forward // 1. commit indexes in NCs metadataProvider.setMetadataTxnContext(mdTxnCtx); - JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(dataset, indexes, - metadataProvider); + JobSpecification jobSpec = + ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider); executeHyracksJob(jobSpec); // 2. add pending files in metadata for (ExternalFile file : files) { @@ -213,20 +214,11 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { metadataProvider.getLocks().unlock(); } } - return mdTxnCtx; } - public static GlobalRecoveryManager instance() { - return instance; - } - - public static synchronized void instantiate(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, - IStorageComponentProvider componentProvider) { - instance = new GlobalRecoveryManager(ccServiceCtx, hcc, componentProvider); - } - - public static synchronized void setState(ClusterState state) { - GlobalRecoveryManager.state = state; + @Override + public boolean isRecoveryCompleted() { + return recoveryCompleted; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index 1ed37e0..4174685 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -28,18 +28,19 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.IntStream; -import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.declared.MetadataManagerUtil; +import org.apache.asterix.metadata.api.IActiveEntityController; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.lock.LockList; -import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.rebalance.IDatasetRebalanceCallback; @@ -119,7 +120,6 @@ public class RebalanceUtil { // The target dataset for rebalance. targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName); - // Rebalances the source dataset into the target dataset. rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback); @@ -128,8 +128,6 @@ public class RebalanceUtil { } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; - } finally { - metadataProvider.getLocks().reset(); } // Up to this point, since the bulk part of a rebalance operation is done, @@ -143,7 +141,7 @@ public class RebalanceUtil { // Executes the 2nd Metadata transaction for switching the metadata entity. // It detaches the source dataset and attaches the target dataset to metadata's point of view. runMetadataTransaction(metadataProvider, - () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc)); + () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc)); // Executes the 3rd Metadata transaction to drop the source dataset files and the node group for // the source dataset. runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc)); @@ -188,8 +186,6 @@ public class RebalanceUtil { } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; - } finally { - metadataProvider.getLocks().reset(); } } @@ -219,22 +215,25 @@ public class RebalanceUtil { private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); - - // Acquires the metadata write lock for the source/target dataset. - writeLockDataset(metadataProvider.getLocks(), source); - - Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, source.getDataverseName(), - source.getDatasetName()); - - if (sourceDataset == null) { - // The dataset has already been dropped. - // In this case, we should drop the generated target dataset files. - dropDatasetFiles(target, metadataProvider, hcc); - return; + // upgrade lock + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); + ActiveNotificationHandler activeNotificationHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + IMetadataLockManager lockManager = appCtx.getMetadataLockManager(); + lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), DatasetUtil.getFullyQualifiedName(target)); + try { + // Updates the dataset entry in the metadata storage + MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target); + for (IActiveEntityEventsListener listener : activeNotificationHandler.getEventListeners()) { + if (listener instanceof IActiveEntityController) { + IActiveEntityController controller = (IActiveEntityController) listener; + controller.replace(target); + } + } + } finally { + lockManager.downgradeDatasetLockToExclusiveModify(metadataProvider.getLocks(), + DatasetUtil.getFullyQualifiedName(target)); } - - // Updates the dataset entry in the metadata storage - MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target); } // Drops the source dataset. @@ -245,12 +244,12 @@ public class RebalanceUtil { dropDatasetFiles(source, metadataProvider, hcc); // Drops the metadata entry of source dataset's node group. + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); String sourceNodeGroup = source.getNodeGroupName(); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup); + appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup); MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(), sourceNodeGroup, true); } - // Creates the files for the rebalance target dataset. private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { @@ -301,8 +300,8 @@ public class RebalanceUtil { int numKeys = source.getPrimaryKeys().size(); int numValues = source.hasMetaPart() ? 2 : 1; int[] fieldPermutation = IntStream.range(0, numKeys + numValues).toArray(); - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> upsertOpAndConstraints = DatasetUtil - .createPrimaryIndexUpsertOp(spec, metadataProvider, target, + Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> upsertOpAndConstraints = + DatasetUtil.createPrimaryIndexUpsertOp(spec, metadataProvider, target, source.getPrimaryRecordDescriptor(metadataProvider), fieldPermutation, MissingWriterFactory.INSTANCE); IOperatorDescriptor upsertOp = upsertOpAndConstraints.first; @@ -334,13 +333,6 @@ public class RebalanceUtil { } } - // Acquires a read lock for the dataverse and a write lock for the dataset, in order to populate the dataset. - private static void writeLockDataset(LockList locks, Dataset dataset) throws AsterixException { - MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, dataset.getDataverseName()); - MetadataLockManager.INSTANCE.acquireDatasetWriteLock(locks, - dataset.getDataverseName() + "." + dataset.getDatasetName()); - } - // Creates and loads all secondary indexes for the rebalance target dataset. private static void createAndLoadSecondaryIndexesForTarget(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { @@ -349,13 +341,13 @@ public class RebalanceUtil { continue; } // Creates the secondary index. - JobSpecification indexCreationJobSpec = IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, - metadataProvider); + JobSpecification indexCreationJobSpec = + IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, metadataProvider); JobUtils.runJob(hcc, indexCreationJobSpec, true); // Loads the secondary index. - JobSpecification indexLoadingJobSpec = IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, - metadataProvider); + JobSpecification indexLoadingJobSpec = + IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, metadataProvider); JobUtils.runJob(hcc, indexLoadingJobSpec, true); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java index 3d6543b..5abbe40 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.server.ConnectorApiServlet; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -48,10 +47,8 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -183,8 +180,7 @@ public class ConnectorApiServletTest { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); // Retrieves file splits of the dataset. MetadataProvider metadataProvider = new MetadataProvider( - (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), null, - new StorageComponentProvider()); + (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), null); try { metadataProvider.setMetadataTxnContext(mdTxnCtx); Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 99892c5..c1421c5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -227,7 +227,7 @@ public class TestNodeController { Index index = primaryIndexInfo.getIndex(); CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); - MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse, storageComponentProvider); + MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse); try { return dataset.getResourceFactory(mdProvider, index, primaryIndexInfo.recordType, primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties); @@ -246,8 +246,7 @@ public class TestNodeController { Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); MetadataProvider mdProvider = new MetadataProvider( - (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse, - storageComponentProvider); + (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse); try { IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index, recordType, metaType, mergePolicyFactory, mergePolicyProperties); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java new file mode 100644 index 0000000..71cb038 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +abstract class Action { + boolean done = false; + HyracksDataException failure; + + void execute(MetadataProvider actorMdProvider) { + try { + doExecute(actorMdProvider); + } catch (Exception e) { + failure = HyracksDataException.create(e); + } + synchronized (this) { + done = true; + notifyAll(); + } + } + + protected abstract void doExecute(MetadataProvider mdProvider) throws Exception; + + boolean hasFailed() { + return failure != null; + } + + HyracksDataException getFailure() { + return failure; + } + + synchronized void sync() throws InterruptedException { + while (!done) { + wait(); + } + } + + boolean isDone() { + return done; + } +} \ No newline at end of file
