This is an automated email from the ASF dual-hosted git repository.
dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 12c5616 [NO ISSUE][COMP] Refactor locking in the compiler
12c5616 is described below
commit 12c5616354f14ebfaa4d249702540cbf6724f30c
Author: Dmitry Lychagin <[email protected]>
AuthorDate: Tue Dec 10 13:31:53 2019 -0800
[NO ISSUE][COMP] Refactor locking in the compiler
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce interface IMetadataLockUtil for methods in
MetadataLockUtil
- Make MetadataLockUtil overridable by product extensions
- Refactor dataverse and dataset creation methods in
QueryTranslator for better extensibility
Change-Id: I479be18ae68d9b8d42050e74968816767a454eb3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4424
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../algebra/extension/ExtensionStatement.java | 8 +-
.../apache/asterix/app/active/RecoveryTask.java | 10 +-
.../asterix/app/translator/QueryTranslator.java | 158 ++++++++++++---------
.../asterix/hyracks/bootstrap/CCApplication.java | 9 +-
.../test/active/ActiveEventsListenerTest.java | 4 +
.../apache/asterix/test/active/TestUserActor.java | 16 ++-
.../common/dataflow/ICcApplicationContext.java | 6 +
.../asterix/common/metadata/IMetadataLockUtil.java | 107 ++++++++++++++
.../asterix/metadata/utils/MetadataLockUtil.java | 88 ++++++++----
.../runtime/utils/CcApplicationContext.java | 10 +-
10 files changed, 302 insertions(+), 114 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
index 15267aa..08f330b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
@@ -22,9 +22,7 @@ import org.apache.asterix.lang.common.base.AbstractStatement;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* An interface that provides an extension mechanism to extend a language with
additional statements
@@ -45,10 +43,8 @@ public abstract class ExtensionStatement extends
AbstractStatement {
* @param requestParameters
* @param metadataProvider
* @param resultSetId
- * @throws HyracksDataException
- * @throws AlgebricksException
+ * @throws Exception
*/
public abstract void handle(IHyracksClientConnection hcc,
IStatementExecutor statementExecutor,
- IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException;
+ IRequestParameters requestParameters, MetadataProvider
metadataProvider, int resultSetId) throws Exception;
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 0ee41cd..bd70ed4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -28,9 +28,9 @@ import
org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.IRetryPolicy;
@@ -156,8 +156,9 @@ public class RecoveryTask {
}
}
IMetadataLockManager lockManager =
metadataProvider.getApplicationContext().getMetadataLockManager();
+ IMetadataLockUtil lockUtil =
metadataProvider.getApplicationContext().getMetadataLockUtil();
try {
- acquirePostRecoveryLocks(lockManager);
+ acquirePostRecoveryLocks(lockManager, lockUtil);
synchronized (listener) {
if (!cancelRecovery && listener.getState() ==
ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed",
listener.getEntityId());
@@ -187,12 +188,13 @@ public class RecoveryTask {
metadataProvider.getLocks().reset();
}
- protected void acquirePostRecoveryLocks(IMetadataLockManager lockManager)
throws AlgebricksException {
+ protected void acquirePostRecoveryLocks(IMetadataLockManager lockManager,
IMetadataLockUtil lockUtil)
+ throws AlgebricksException {
EntityId entityId = listener.getEntityId();
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
entityId.getDataverseName(),
entityId.getEntityName());
for (Dataset dataset : listener.getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager,
metadataProvider.getLocks(), dataset.getDataverseName(),
+ lockUtil.modifyDatasetBegin(lockManager,
metadataProvider.getLocks(), dataset.getDataverseName(),
dataset.getDatasetName());
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 64a9a63..625b95a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -78,6 +78,7 @@ import org.apache.asterix.common.exceptions.WarningCollector;
import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.common.utils.StorageConstants;
@@ -154,7 +155,6 @@ import
org.apache.asterix.metadata.utils.ExternalIndexingOperations;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
@@ -234,6 +234,7 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
protected final ExecutorService executorService;
protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
protected final IMetadataLockManager lockManager;
+ protected final IMetadataLockUtil lockUtil;
protected final IResponsePrinter responsePrinter;
protected final WarningCollector warningCollector;
@@ -242,6 +243,7 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
IResponsePrinter responsePrinter) {
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
+ this.lockUtil = appCtx.getMetadataLockUtil();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -481,37 +483,39 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
IRequestParameters requestParameters) throws Exception {
CreateDataverseStatement stmtCreateDataverse =
(CreateDataverseStatement) stmt;
DataverseName dvName = stmtCreateDataverse.getDataverseName();
- MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- lockManager.acquireDataverseReadLock(metadataProvider.getLocks(),
dvName);
+ lockUtil.createDataverseBegin(lockManager,
metadataProvider.getLocks(), dvName);
try {
- doCreateDataverseStatement(mdTxnCtx, metadataProvider,
stmtCreateDataverse);
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw e;
+ doCreateDataverseStatement(metadataProvider, stmtCreateDataverse);
} finally {
metadataProvider.getLocks().unlock();
}
}
@SuppressWarnings("squid:S00112")
- protected boolean doCreateDataverseStatement(MetadataTransactionContext
mdTxnCtx, MetadataProvider metadataProvider,
+ protected boolean doCreateDataverseStatement(MetadataProvider
metadataProvider,
CreateDataverseStatement stmtCreateDataverse) throws Exception {
- DataverseName dvName = stmtCreateDataverse.getDataverseName();
- Dataverse dv =
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dvName);
- if (dv != null) {
- if (stmtCreateDataverse.getIfNotExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return false;
- } else {
- throw new CompilationException(ErrorCode.DATAVERSE_EXISTS,
stmtCreateDataverse.getSourceLocation(),
- dvName);
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ DataverseName dvName = stmtCreateDataverse.getDataverseName();
+ Dataverse dv =
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dvName);
+ if (dv != null) {
+ if (stmtCreateDataverse.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return false;
+ } else {
+ throw new CompilationException(ErrorCode.DATAVERSE_EXISTS,
stmtCreateDataverse.getSourceLocation(),
+ dvName);
+ }
}
+
MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
+ new Dataverse(dvName, stmtCreateDataverse.getFormat(),
MetadataUtil.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return true;
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
}
-
MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
- new Dataverse(dvName, stmtCreateDataverse.getFormat(),
MetadataUtil.PENDING_NO_OP));
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return true;
}
protected static void validateCompactionPolicy(String compactionPolicy,
@@ -552,13 +556,10 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
public void handleCreateDatasetStatement(MetadataProvider
metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws CompilationException, Exception {
- MutableObject<ProgressState> progress = new
MutableObject<>(ProgressState.NO_PROGRESS);
+ IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
DatasetDecl dd = (DatasetDecl) stmt;
- SourceLocation sourceLoc = dd.getSourceLocation();
DataverseName dataverseName =
getActiveDataverseName(dd.getDataverse());
String datasetName = dd.getName().getValue();
- DatasetType dsType = dd.getDatasetType();
DataverseName itemTypeDataverseName =
getActiveDataverseName(dd.getItemTypeDataverse());
String itemTypeName = dd.getItemTypeName().getValue();
DataverseName metaItemTypeDataverseName = null;
@@ -571,16 +572,34 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
Identifier ngNameId = dd.getNodegroupName();
String nodegroupName = ngNameId == null ? null : ngNameId.getValue();
String compactionPolicy = dd.getCompactionPolicy();
+ boolean defaultCompactionPolicy = compactionPolicy == null;
+
+ lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName,
+ itemTypeDataverseName, itemTypeName,
metaItemTypeDataverseName, metaItemTypeName, nodegroupName,
+ compactionPolicy, defaultCompactionPolicy,
dd.getDatasetDetailsDecl());
+ try {
+ doCreateDatasetStatement(metadataProvider, dd, dataverseName,
datasetName, itemTypeDataverseName,
+ itemTypeName, metaItemTypeDataverseName, metaItemTypeName,
hcc, requestParameters);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
+ protected void doCreateDatasetStatement(MetadataProvider metadataProvider,
DatasetDecl dd,
+ DataverseName dataverseName, String datasetName, DataverseName
itemTypeDataverseName, String itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName,
IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
+ MutableObject<ProgressState> progress = new
MutableObject<>(ProgressState.NO_PROGRESS);
+ SourceLocation sourceLoc = dd.getSourceLocation();
+ DatasetType dsType = dd.getDatasetType();
+ Identifier ngNameId = dd.getNodegroupName();
+ String compactionPolicy = dd.getCompactionPolicy();
Map<String, String> compactionPolicyProperties =
dd.getCompactionPolicyProperties();
String compressionScheme = metadataProvider.getCompressionManager()
.getDdlOrDefaultCompressionScheme(dd.getDatasetCompressionScheme());
- boolean defaultCompactionPolicy = compactionPolicy == null;
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createDatasetBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName,
- itemTypeDataverseName, itemTypeName,
metaItemTypeDataverseName, metaItemTypeName, nodegroupName,
- compactionPolicy, defaultCompactionPolicy);
Dataset dataset = null;
try {
IDatasetDetails datasetDetails;
@@ -739,8 +758,6 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
}
}
throw e;
- } finally {
- metadataProvider.getLocks().unlock();
}
}
@@ -796,7 +813,7 @@ public class QueryTranslator extends AbstractLangTranslator
implements IStatemen
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
- MetadataLockUtil.createIndexBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
try {
Dataset ds = metadataProvider.findDataset(dataverseName,
datasetName);
if (ds == null) {
@@ -1209,7 +1226,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createTypeBegin(lockManager,
metadataProvider.getLocks(), dataverseName, typeName);
+ lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(),
dataverseName, typeName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
if (dv == null) {
@@ -1243,25 +1260,27 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
protected void handleDataverseDropStatement(MetadataProvider
metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- SourceLocation sourceLoc = stmtDelete.getSourceLocation();
- DataverseName dataverseName = stmtDelete.getDataverseName();
- if
(dataverseName.equals(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME)) {
+ DataverseDropStatement stmtDropDataverse = (DataverseDropStatement)
stmt;
+ SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation();
+ DataverseName dataverseName = stmtDropDataverse.getDataverseName();
+ if
(dataverseName.equals(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME)
+ ||
dataverseName.equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR,
sourceLoc,
- MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME + "
dataverse can't be dropped");
+ dataverseName + " dataverse can't be dropped");
}
- lockManager.acquireDataverseWriteLock(metadataProvider.getLocks(),
dataverseName);
+ lockUtil.dropDataverseBegin(lockManager, metadataProvider.getLocks(),
dataverseName);
try {
- doDropDataverse(stmtDelete, sourceLoc, metadataProvider, hcc);
+ doDropDataverse(stmtDropDataverse, metadataProvider, hcc,
requestParameters);
} finally {
metadataProvider.getLocks().unlock();
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
- protected boolean doDropDataverse(DataverseDropStatement stmtDelete,
SourceLocation sourceLoc,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc)
throws Exception {
- DataverseName dataverseName = stmtDelete.getDataverseName();
+ protected boolean doDropDataverse(DataverseDropStatement
stmtDropDataverse, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IRequestParameters
requestParameters) throws Exception {
+ SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation();
+ DataverseName dataverseName = stmtDropDataverse.getDataverseName();
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -1270,7 +1289,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
if (dv == null) {
- if (stmtDelete.getIfExists()) {
+ if (stmtDropDataverse.getIfExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return false;
} else {
@@ -1420,17 +1439,19 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
SourceLocation sourceLoc = stmtDelete.getSourceLocation();
DataverseName dataverseName =
getActiveDataverseName(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
- MetadataLockUtil.dropDatasetBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropDatasetBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
try {
- doDropDataset(dataverseName, datasetName, metadataProvider,
stmtDelete.getIfExists(), hcc, true, sourceLoc);
+ doDropDataset(dataverseName, datasetName, metadataProvider,
stmtDelete.getIfExists(), hcc,
+ requestParameters, true, sourceLoc);
} finally {
metadataProvider.getLocks().unlock();
+
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
public void doDropDataset(DataverseName dataverseName, String datasetName,
MetadataProvider metadataProvider,
- boolean ifExists, IHyracksClientConnection hcc, boolean
dropCorrespondingNodeGroup,
- SourceLocation sourceLoc) throws Exception {
+ boolean ifExists, IHyracksClientConnection hcc, IRequestParameters
requestParameters,
+ boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc)
throws Exception {
MutableObject<ProgressState> progress = new
MutableObject<>(ProgressState.NO_PROGRESS);
MutableObject<MetadataTransactionContext> mdTxnCtx =
new
MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -1484,8 +1505,6 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
}
throw e;
- } finally {
-
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1502,7 +1521,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- MetadataLockUtil.dropIndexBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
// For external index
boolean dropFilesIndex = false;
try {
@@ -1676,7 +1695,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropTypeBegin(lockManager,
metadataProvider.getLocks(), dataverseName, typeName);
+ lockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(),
dataverseName, typeName);
try {
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
dataverseName, typeName);
if (dt == null) {
@@ -1730,8 +1749,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createFunctionBegin(lockManager,
metadataProvider.getLocks(), dataverseName,
- signature.getName());
+ lockUtil.createFunctionBegin(lockManager, metadataProvider.getLocks(),
dataverseName, signature.getName());
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
if (dv == null) {
@@ -1808,7 +1826,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
signature.setDataverseName(getActiveDataverseName(signature.getDataverseName()));
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropFunctionBegin(lockManager,
metadataProvider.getLocks(), signature.getDataverseName(),
+ lockUtil.dropFunctionBegin(lockManager, metadataProvider.getLocks(),
signature.getDataverseName(),
signature.getName());
try {
Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx,
signature);
@@ -1839,7 +1857,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.modifyDatasetBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
try {
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName,
loadStmt.getDatasetName().getValue(),
@@ -1872,7 +1890,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
- MetadataLockUtil.insertDeleteUpsertBegin(lockManager,
metadataProvider.getLocks(), dataverseName,
+ lockUtil.insertDeleteUpsertBegin(lockManager,
metadataProvider.getLocks(), dataverseName,
stmtInsertUpsert.getDatasetName());
}
@@ -1934,7 +1952,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.insertDeleteUpsertBegin(lockManager,
metadataProvider.getLocks(), dataverseName,
+ lockUtil.insertDeleteUpsertBegin(lockManager,
metadataProvider.getLocks(), dataverseName,
stmtDelete.getDatasetName());
try {
metadataProvider.setWriteTransaction(true);
@@ -2027,7 +2045,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, feedName);
try {
Feed feed =
MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName);
@@ -2060,7 +2078,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
SourceLocation sourceLoc = cfps.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(null);
String policyName = cfps.getPolicyName();
- MetadataLockUtil.createFeedPolicyBegin(lockManager,
metadataProvider.getLocks(), dataverseName, policyName);
+ lockUtil.createFeedPolicyBegin(lockManager,
metadataProvider.getLocks(), dataverseName, policyName);
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2122,7 +2140,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, feedName);
try {
Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
dataverseName, feedName);
if (feed == null) {
@@ -2173,7 +2191,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
SourceLocation sourceLoc = stmtFeedPolicyDrop.getSourceLocation();
DataverseName dataverseName =
getActiveDataverseName(stmtFeedPolicyDrop.getDataverseName());
String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
- MetadataLockUtil.dropFeedPolicyBegin(lockManager,
metadataProvider.getLocks(), dataverseName, policyName);
+ lockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(),
dataverseName, policyName);
try {
FeedPolicyEntity feedPolicy =
MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
if (feedPolicy == null) {
@@ -2202,7 +2220,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String feedName = sfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean committed = false;
- MetadataLockUtil.startFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.startFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, feedName);
try {
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// Runtime handler
@@ -2262,7 +2280,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
throw new CompilationException(ErrorCode.COMPILATION_ERROR,
sourceLoc,
"Feed " + feedName + " is not started.");
}
- MetadataLockUtil.stopFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.stopFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, feedName);
try {
listener.stop(metadataProvider);
} finally {
@@ -2286,8 +2304,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
// Transaction handling
- MetadataLockUtil.connectFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName,
- feedName);
+ lockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName, feedName);
try {
// validation
Dataset dataset =
FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName,
datasetName);
@@ -2337,8 +2354,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.disconnectFeedBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName,
- feedName);
+ lockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName, feedName);
try {
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
@@ -2385,7 +2401,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- MetadataLockUtil.compactBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.compactBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
try {
Dataset ds = metadataProvider.findDataset(dataverseName,
datasetName);
if (ds == null) {
@@ -2698,7 +2714,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
Dataset transactionDataset = null;
boolean lockAquired = false;
boolean success = false;
- MetadataLockUtil.refreshDatasetBegin(lockManager,
metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.refreshDatasetBegin(lockManager, metadataProvider.getLocks(),
dataverseName, datasetName);
try {
ds = metadataProvider.findDataset(dataverseName, datasetName);
// Dataset exists ?
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e2fbe35..336839a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -70,6 +70,7 @@ import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.external.library.ExternalLibraryManager;
@@ -79,6 +80,7 @@ import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -188,7 +190,8 @@ public class CCApplication extends BaseCCApplication {
CCExtensionManager ccExtensionManager) throws AlgebricksException,
IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new
ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(), receptionistFactory,
configValidatorFactory, ccExtensionManager);
+ new MetadataLockManager(), createMetadataLockUtil(),
receptionistFactory, configValidatorFactory,
+ ccExtensionManager);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws
Exception {
@@ -199,6 +202,10 @@ public class CCApplication extends BaseCCApplication {
return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled);
}
+ protected IMetadataLockUtil createMetadataLockUtil() {
+ return new MetadataLockUtil();
+ }
+
@Override
public void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 21b364f..ded5393 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -41,6 +41,7 @@ import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.api.ICCExtensionManager;
@@ -49,6 +50,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.functions.FunctionCollection;
import org.apache.asterix.runtime.functions.FunctionManager;
import org.apache.asterix.runtime.utils.CcApplicationContext;
@@ -94,6 +96,7 @@ public class ActiveEventsListenerTest {
static IStorageComponentProvider componentProvider;
static JobIdFactory jobIdFactory;
static IMetadataLockManager lockManager = new MetadataLockManager();
+ static IMetadataLockUtil lockUtil = new MetadataLockUtil();
static AlgebricksAbsolutePartitionConstraint locations;
static ExecutorService executor;
@@ -122,6 +125,7 @@ public class ActiveEventsListenerTest {
hcc = Mockito.mock(IHyracksClientConnection.class);
Mockito.when(appCtx.getActiveNotificationHandler()).thenReturn(handler);
Mockito.when(appCtx.getMetadataLockManager()).thenReturn(lockManager);
+ Mockito.when(appCtx.getMetadataLockUtil()).thenReturn(lockUtil);
Mockito.when(appCtx.getServiceContext()).thenReturn(ccServiceCtx);
Mockito.when(appCtx.getClusterStateManager()).thenReturn(clusterStateManager);
Mockito.when(appCtx.getActiveProperties()).thenReturn(Mockito.mock(ActiveProperties.class));
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
index 36f704c..1e2a795 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -26,22 +26,24 @@ import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.metadata.api.IActiveEntityController;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class TestUserActor extends Actor {
private TestClusterControllerActor clusterController;
private IMetadataLockManager lockManager;
+ private IMetadataLockUtil lockUtil;
public TestUserActor(String name, MetadataProvider metadataProvider,
TestClusterControllerActor clusterController) {
super(name, metadataProvider);
this.clusterController = clusterController;
this.lockManager =
metadataProvider.getApplicationContext().getMetadataLockManager();
+ this.lockUtil =
metadataProvider.getApplicationContext().getMetadataLockUtil();
}
public Action startActivity(IActiveEntityController actionListener) {
@@ -54,8 +56,8 @@ public class TestUserActor extends Actor {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName,
entityName);
List<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
- MetadataLockUtil.modifyDatasetBegin(lockManager,
mdProvider.getLocks(),
- dataset.getDataverseName(),
dataset.getDatasetName());
+ lockUtil.modifyDatasetBegin(lockManager,
mdProvider.getLocks(), dataset.getDataverseName(),
+ dataset.getDatasetName());
}
actionListener.start(mdProvider);
} finally {
@@ -77,8 +79,8 @@ public class TestUserActor extends Actor {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName,
entityName);
List<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
- MetadataLockUtil.modifyDatasetBegin(lockManager,
mdProvider.getLocks(),
- dataset.getDataverseName(),
dataset.getDatasetName());
+ lockUtil.modifyDatasetBegin(lockManager,
mdProvider.getLocks(), dataset.getDataverseName(),
+ dataset.getDatasetName());
}
actionListener.stop(mdProvider);
} finally {
@@ -197,7 +199,7 @@ public class TestUserActor extends Actor {
DataverseName dataverseName = dataset.getDataverseName();
String datasetName = dataset.getDatasetName();
try {
- MetadataLockUtil.createIndexBegin(lockManager,
mdProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.createIndexBegin(lockManager,
mdProvider.getLocks(), dataverseName, datasetName);
if (actionListener.isActive()) {
throw new
RuntimeDataException(ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
DatasetUtil.getFullyQualifiedDisplayName(dataverseName, datasetName) + ".index",
@@ -219,7 +221,7 @@ public class TestUserActor extends Actor {
DataverseName dataverseName = dataset.getDataverseName();
String datasetName = dataset.getDatasetName();
try {
- MetadataLockUtil.dropIndexBegin(lockManager,
mdProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropIndexBegin(lockManager,
mdProvider.getLocks(), dataverseName, datasetName);
if (actionListener.isActive()) {
throw new RuntimeDataException(
ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3389962..d764b98 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,6 +27,7 @@ import
org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -92,6 +93,11 @@ public interface ICcApplicationContext extends
IApplicationContext {
IMetadataLockManager getMetadataLockManager();
/**
+ * @return the metadata lock utility
+ */
+ IMetadataLockUtil getMetadataLockUtil();
+
+ /**
* @return the metadata bootstrap
*/
IMetadataBootstrap getMetadataBootstrap();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
new file mode 100644
index 0000000..7f02653
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.metadata;
+
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface IMetadataLockUtil {
+
+ // Dataverse helpers
+
+ void createDataverseBegin(IMetadataLockManager lockManager, LockList
locks, DataverseName dataverseName)
+ throws AlgebricksException;
+
+ void dropDataverseBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName)
+ throws AlgebricksException;
+
+ // Dataset helpers
+
+ void createDatasetBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName, DataverseName itemTypeDataverseName, String
itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName,
String nodeGroupName,
+ String compactionPolicyName, boolean isDefaultCompactionPolicy,
Object datasetDetails)
+ throws AlgebricksException;
+
+ void dropDatasetBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void modifyDatasetBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void refreshDatasetBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void compactBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String datasetName)
+ throws AlgebricksException;
+
+ void insertDeleteUpsertBegin(IMetadataLockManager lockManager, LockList
locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ // Index helpers
+
+ void createIndexBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void dropIndexBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ // Type helpers
+
+ void createTypeBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String typeName)
+ throws AlgebricksException;
+
+ void dropTypeBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String typeName)
+ throws AlgebricksException;
+
+ // Function helpers
+
+ void createFunctionBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String functionName) throws AlgebricksException;
+
+ void dropFunctionBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String functionName) throws AlgebricksException;
+
+ // Feed helpers
+
+ void createFeedPolicyBegin(IMetadataLockManager lockManager, LockList
locks, DataverseName dataverseName,
+ String policyName) throws AlgebricksException;
+
+ void dropFeedPolicyBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String policyName) throws AlgebricksException;
+
+ void createFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void dropFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void startFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void stopFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void connectFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName, String feedName) throws AlgebricksException;
+
+ void disconnectFeedBegin(IMetadataLockManager lockManager, LockList locks,
DataverseName dataverseName,
+ String datasetName, String feedName) throws AlgebricksException;
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index e6ab9c8..3e5ccd1 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -20,17 +20,39 @@ package org.apache.asterix.metadata.utils;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.metadata.LockList;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-public class MetadataLockUtil {
+public class MetadataLockUtil implements IMetadataLockUtil {
- private MetadataLockUtil() {
+ @Override
+ public void createDataverseBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName)
+ throws AlgebricksException {
+ lockMgr.acquireDataverseReadLock(locks, dataverseName);
+ }
+
+ @Override
+ public void dropDataverseBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName)
+ throws AlgebricksException {
+ lockMgr.acquireDataverseWriteLock(locks, dataverseName);
}
- public static void createDatasetBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void createDatasetBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String datasetName, DataverseName itemTypeDataverseName, String
itemTypeName,
DataverseName metaItemTypeDataverseName, String metaItemTypeName,
String nodeGroupName,
+ String compactionPolicyName, boolean isDefaultCompactionPolicy,
Object datasetDetails)
+ throws AlgebricksException {
+ createDatasetBeginPre(lockMgr, locks, dataverseName,
itemTypeDataverseName, itemTypeName,
+ metaItemTypeDataverseName, metaItemTypeName, nodeGroupName,
compactionPolicyName,
+ isDefaultCompactionPolicy);
+ lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
+ }
+
+ protected final void createDatasetBeginPre(IMetadataLockManager lockMgr,
LockList locks,
+ DataverseName dataverseName, DataverseName itemTypeDataverseName,
String itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName,
String nodeGroupName,
String compactionPolicyName, boolean isDefaultCompactionPolicy)
throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
if (!dataverseName.equals(itemTypeDataverseName)) {
@@ -51,121 +73,139 @@ public class MetadataLockUtil {
if (!isDefaultCompactionPolicy) {
lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
}
- lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void createIndexBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void createIndexBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetCreateIndexLock(locks, dataverseName,
datasetName);
}
- public static void dropIndexBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void dropIndexBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void createTypeBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void createTypeBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String typeName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDataTypeWriteLock(locks, dataverseName, typeName);
}
- public static void dropDatasetBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void dropDatasetBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void dropTypeBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void dropTypeBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String typeName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDataTypeWriteLock(locks, dataverseName, typeName);
}
- public static void createFunctionBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void createFunctionBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String functionName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFunctionWriteLock(locks, dataverseName, functionName);
}
- public static void dropFunctionBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropFunctionBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String functionName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFunctionWriteLock(locks, dataverseName, functionName);
}
- public static void modifyDatasetBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void modifyDatasetBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
}
- public static void insertDeleteUpsertBegin(IMetadataLockManager lockMgr,
LockList locks,
- DataverseName dataverseName, String datasetName) throws
AlgebricksException {
+ @Override
+ public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
}
- public static void dropFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void dropFeedBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, feedName);
}
- public static void dropFeedPolicyBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropFeedPolicyBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String policyName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, policyName);
}
- public static void startFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void startFeedBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
}
- public static void stopFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void stopFeedBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String feedName) throws AlgebricksException {
// TODO: dataset lock?
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
}
- public static void createFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void createFeedBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, feedName);
}
- public static void connectFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void connectFeedBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String datasetName, String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void createFeedPolicyBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void createFeedPolicyBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String policyName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFeedPolicyWriteLock(locks, dataverseName, policyName);
}
- public static void disconnectFeedBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void disconnectFeedBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String datasetName, String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void compactBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
+ @Override
+ public void compactBegin(IMetadataLockManager lockMgr, LockList locks,
DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void refreshDatasetBegin(IMetadataLockManager lockMgr,
LockList locks, DataverseName dataverseName,
+ @Override
+ public void refreshDatasetBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName,
datasetName);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index a0b10c6..3de4a88 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -47,6 +47,7 @@ import
org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -91,6 +92,7 @@ public class CcApplicationContext implements
ICcApplicationContext {
private INcLifecycleCoordinator ftStrategy;
private IJobLifecycleListener activeLifeCycleListener;
private IMetadataLockManager mdLockManager;
+ private IMetadataLockUtil mdLockUtil;
private IClusterStateManager clusterStateManager;
private final INodeJobTracker nodeJobTracker;
private final ITxnIdFactory txnIdFactory;
@@ -103,7 +105,7 @@ public class CcApplicationContext implements
ICcApplicationContext {
ILibraryManager libraryManager, Supplier<IMetadataBootstrap>
metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener,
IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager, IReceptionistFactory
receptionistFactory,
+ IMetadataLockManager mdLockManager, IMetadataLockUtil mdLockUtil,
IReceptionistFactory receptionistFactory,
IConfigValidatorFactory configValidatorFactory, Object
extensionManager)
throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
@@ -130,6 +132,7 @@ public class CcApplicationContext implements
ICcApplicationContext {
this.globalRecoveryManager = globalRecoveryManager;
this.storageComponentProvider = storageComponentProvider;
this.mdLockManager = mdLockManager;
+ this.mdLockUtil = mdLockUtil;
clusterStateManager = new ClusterStateManager();
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
@@ -268,6 +271,11 @@ public class CcApplicationContext implements
ICcApplicationContext {
}
@Override
+ public IMetadataLockUtil getMetadataLockUtil() {
+ return mdLockUtil;
+ }
+
+ @Override
public IClusterStateManager getClusterStateManager() {
return clusterStateManager;
}