http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java index 2aee087..7274ae3 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.lang.common.statement; -import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,21 +33,16 @@ public class Query implements IReturningStatement { private boolean topLevel = true; private Expression body; private int varCounter; - private List<String> dataverses = new ArrayList<>(); - private List<String> datasets = new ArrayList<>(); public Query(boolean explain) { this.explain = explain; } - public Query(boolean explain, boolean topLevel, Expression body, int varCounter, List<String> dataverses, - List<String> datasets) { + public Query(boolean explain, boolean topLevel, Expression body, int varCounter) { this.explain = explain; this.topLevel = topLevel; this.body = body; this.varCounter = varCounter; - this.dataverses.addAll(dataverses); - this.datasets.addAll(datasets); } @Override @@ -99,25 +93,9 @@ public class Query implements IReturningStatement { return visitor.visit(this, arg); } - public void setDataverses(List<String> dataverses) { - this.dataverses = dataverses; - } - - public void setDatasets(List<String> datasets) { - this.datasets = datasets; - } - - public List<String> getDataverses() { - return dataverses; - } - - public List<String> getDatasets() { - return datasets; - } - @Override public int hashCode() { - return ObjectUtils.hashCodeMulti(body, datasets, dataverses, topLevel, explain); + return ObjectUtils.hashCodeMulti(body, topLevel, explain); } @Override @@ -129,9 +107,7 @@ public class Query implements IReturningStatement { return false; } Query target = (Query) object; - return explain == target.explain && ObjectUtils.equals(body, target.body) - && ObjectUtils.equals(datasets, target.datasets) && ObjectUtils.equals(dataverses, target.dataverses) - && topLevel == target.topLevel; + return explain == target.explain && ObjectUtils.equals(body, target.body) && topLevel == target.topLevel; } @Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java index 9b419f1..58c81a6 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java @@ -102,7 +102,7 @@ public class CloneAndSubstituteVariablesVisitor extends if (gc.hasGroupFieldList()) { for (Pair<Expression, Identifier> varId : gc.getGroupFieldList()) { Expression newExpr = (Expression) varId.first.accept(this, env).first; - newGroupFieldList.add(new Pair<Expression, Identifier>(newExpr, varId.second)); + newGroupFieldList.add(new Pair<>(newExpr, varId.second)); } } GroupbyClause newGroup = new GroupbyClause(newGbyList, newDecorList, newWithMap, newGroupVar, newGroupFieldList, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java index a1c17fc..6346994 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java @@ -123,15 +123,14 @@ public class SqlppGroupBySugarVisitor extends AbstractSqlppExpressionScopingVisi // Reference to a field in the group variable. if (fieldVars.contains(usedVar)) { // Rewrites to a reference to a field in the group variable. - varExprMap.put(usedVar, - new FieldAccessor(fromBindingVar, SqlppVariableUtil.toUserDefinedVariableName(usedVar - .getVar()))); + varExprMap.put(usedVar, new FieldAccessor(fromBindingVar, + SqlppVariableUtil.toUserDefinedVariableName(usedVar.getVar()))); } } // Select clause. - SelectElement selectElement = new SelectElement( - SqlppRewriteUtil.substituteExpression(expr, varExprMap, context)); + SelectElement selectElement = + new SelectElement(SqlppRewriteUtil.substituteExpression(expr, varExprMap, context)); SelectClause selectClause = new SelectClause(selectElement, null, false); // Construct the select expression. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java index a066110..a8c5ddc 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java @@ -42,8 +42,8 @@ public class SqlppRewriteUtil { // Applying sugar rewriting for group-by. public static Expression rewriteExpressionUsingGroupVariable(VariableExpr groupVar, - Collection<VariableExpr> fieldVars, ILangExpression expr, - LangRewritingContext context) throws CompilationException { + Collection<VariableExpr> fieldVars, ILangExpression expr, LangRewritingContext context) + throws CompilationException { SqlppGroupBySugarVisitor visitor = new SqlppGroupBySugarVisitor(context, groupVar, fieldVars); return expr.accept(visitor, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java index 7c2a174..37e4e26 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java @@ -132,8 +132,7 @@ public class DeepCopyVisitor extends AbstractSqlppQueryExpressionVisitor<ILangEx @Override public Projection visit(Projection projection, Void arg) throws CompilationException { return new Projection(projection.star() ? null : (Expression) projection.getExpression().accept(this, arg), - projection.getName(), - projection.star(), projection.exprStar()); + projection.getName(), projection.star(), projection.exprStar()); } @Override @@ -233,8 +232,7 @@ public class DeepCopyVisitor extends AbstractSqlppQueryExpressionVisitor<ILangEx @Override public Query visit(Query q, Void arg) throws CompilationException { - return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter(), - q.getDataverses(), q.getDatasets()); + return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter()); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java index 9579741..8904241 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java @@ -86,8 +86,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV VariableSubstitutionEnvironment env) throws CompilationException { VariableExpr leftVar = fromTerm.getLeftVariable(); VariableExpr newLeftVar = generateNewVariable(context, leftVar); - VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable() ? generateNewVariable(context, - fromTerm.getPositionalVariable()) : null; + VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable() + ? generateNewVariable(context, fromTerm.getPositionalVariable()) : null; Expression newLeftExpr = (Expression) visitUnnesBindingExpression(fromTerm.getLeftExpression(), env).first; List<AbstractBinaryCorrelateClause> newCorrelateClauses = new ArrayList<>(); @@ -123,8 +123,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV VariableSubstitutionEnvironment env) throws CompilationException { VariableExpr rightVar = joinClause.getRightVariable(); VariableExpr newRightVar = generateNewVariable(context, rightVar); - VariableExpr newRightPosVar = joinClause.hasPositionalVariable() ? generateNewVariable(context, - joinClause.getPositionalVariable()) : null; + VariableExpr newRightPosVar = joinClause.hasPositionalVariable() + ? generateNewVariable(context, joinClause.getPositionalVariable()) : null; // Visits the right expression. Expression newRightExpr = (Expression) visitUnnesBindingExpression(joinClause.getRightExpression(), env).first; @@ -138,8 +138,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV // The condition can refer to the newRightVar and newRightPosVar. Expression conditionExpr = (Expression) joinClause.getConditionExpression().accept(this, currentEnv).first; - JoinClause newJoinClause = new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar, - conditionExpr); + JoinClause newJoinClause = + new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar, conditionExpr); return new Pair<>(newJoinClause, currentEnv); } @@ -148,8 +148,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV VariableSubstitutionEnvironment env) throws CompilationException { VariableExpr rightVar = nestClause.getRightVariable(); VariableExpr newRightVar = generateNewVariable(context, rightVar); - VariableExpr newRightPosVar = nestClause.hasPositionalVariable() ? generateNewVariable(context, - nestClause.getPositionalVariable()) : null; + VariableExpr newRightPosVar = nestClause.hasPositionalVariable() + ? generateNewVariable(context, nestClause.getPositionalVariable()) : null; // Visits the right expression. Expression rightExpr = (Expression) nestClause.getRightExpression().accept(this, env).first; @@ -163,8 +163,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV // The condition can refer to the newRightVar and newRightPosVar. Expression conditionExpr = (Expression) nestClause.getConditionExpression().accept(this, currentEnv).first; - NestClause newJoinClause = new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar, - conditionExpr); + NestClause newJoinClause = + new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar, conditionExpr); return new Pair<>(newJoinClause, currentEnv); } @@ -173,8 +173,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV VariableSubstitutionEnvironment env) throws CompilationException { VariableExpr rightVar = unnestClause.getRightVariable(); VariableExpr newRightVar = generateNewVariable(context, rightVar); - VariableExpr newRightPosVar = unnestClause.hasPositionalVariable() ? generateNewVariable(context, - unnestClause.getPositionalVariable()) : null; + VariableExpr newRightPosVar = unnestClause.hasPositionalVariable() + ? generateNewVariable(context, unnestClause.getPositionalVariable()) : null; // Visits the right expression. Expression rightExpr = (Expression) visitUnnesBindingExpression(unnestClause.getRightExpression(), env).first; @@ -186,8 +186,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV currentEnv.removeSubstitution(newRightPosVar); } // The condition can refer to the newRightVar and newRightPosVar. - UnnestClause newJoinClause = new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar, - newRightPosVar); + UnnestClause newJoinClause = + new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar); return new Pair<>(newJoinClause, currentEnv); } @@ -265,13 +265,13 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV VariableSubstitutionEnvironment env) throws CompilationException { boolean distinct = selectClause.distinct(); if (selectClause.selectElement()) { - Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement = selectClause.getSelectElement() - .accept(this, env); + Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement = + selectClause.getSelectElement().accept(this, env); return new Pair<>(new SelectClause((SelectElement) newSelectElement.first, null, distinct), newSelectElement.second); } else { - Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular = selectClause.getSelectRegular() - .accept(this, env); + Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular = + selectClause.getSelectRegular().accept(this, env); return new Pair<>(new SelectClause(null, (SelectRegular) newSelectRegular.first, distinct), newSelectRegular.second); } @@ -280,8 +280,8 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV @Override public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(SelectElement selectElement, VariableSubstitutionEnvironment env) throws CompilationException { - Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr = selectElement.getExpression() - .accept(this, env); + Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr = + selectElement.getExpression().accept(this, env); return new Pair<>(new SelectElement((Expression) newExpr.first), newExpr.second); } @@ -318,12 +318,12 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV SetOperationInput newRightInput; SetOperationInput rightInput = right.getSetOperationRightInput(); if (rightInput.selectBlock()) { - Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSelectBlock() - .accept(this, env); + Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = + rightInput.getSelectBlock().accept(this, env); newRightInput = new SetOperationInput((SelectBlock) rightResult.first, null); } else { - Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSubquery() - .accept(this, env); + Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = + rightInput.getSubquery().accept(this, env); newRightInput = new SetOperationInput(null, (SelectExpression) rightResult.first); } newRightInputs.add(new SetOperationRight(right.getSetOpType(), right.isSetSemantics(), newRightInput)); @@ -392,10 +392,10 @@ public class SqlppCloneAndSubstituteVariablesVisitor extends CloneAndSubstituteV public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(CaseExpression caseExpr, VariableSubstitutionEnvironment env) throws CompilationException { Expression conditionExpr = (Expression) caseExpr.getConditionExpr().accept(this, env).first; - List<Expression> whenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(), - env, this); - List<Expression> thenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(), - env, this); + List<Expression> whenExprList = + VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(), env, this); + List<Expression> thenExprList = + VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(), env, this); Expression elseExpr = (Expression) caseExpr.getElseExpr().accept(this, env).first; CaseExpression newCaseExpr = new CaseExpression(conditionExpr, whenExprList, thenExprList, elseExpr); return new Pair<>(newCaseExpr, env); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java index 503437c..680ce55 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java @@ -58,8 +58,8 @@ public class SqlppDeleteRewriteVisitor extends AbstractSqlppAstVisitor<Void, Voi : dataverseName.getValue() + "." + datasetName.getValue(); LiteralExpr argumentLiteral = new LiteralExpr(new StringLiteral(arg)); arguments.add(argumentLiteral); - CallExpr callExpression = new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), - arguments); + CallExpr callExpression = + new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), arguments); // From clause. VariableExpr var = deleteStmt.getVariableExpr(); @@ -84,7 +84,7 @@ public class SqlppDeleteRewriteVisitor extends AbstractSqlppAstVisitor<Void, Voi SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, null, whereClause, null, null, null); SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null); SelectExpression selectExpression = new SelectExpression(null, selectSetOperation, null, null, false); - Query query = new Query(false, false, selectExpression, 0, new ArrayList<>(), new ArrayList<>()); + Query query = new Query(false, false, selectExpression, 0); query.setBody(selectExpression); // return the delete statement. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index e553c4d..e6a2951 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -1029,10 +1029,6 @@ DeleteStatement DeleteStatement() throws ParseException: VariableExpr varExpr = null; Expression condition = null; Pair<Identifier, Identifier> nameComponents; - // This is related to the new metadata lock management - setDataverses(new ArrayList<String>()); - setDatasets(new ArrayList<String>()); - } { <DELETE> @@ -1040,20 +1036,13 @@ DeleteStatement DeleteStatement() throws ParseException: ((<AS>)? varExpr = Variable())? (<WHERE> condition = Expression())? { - // First we get the dataverses and datasets that we want to lock - List<String> dataverses = getDataverses(); - List<String> datasets = getDatasets(); - // we remove the pointer to the dataverses and datasets - setDataverses(null); - setDatasets(null); - if(varExpr == null){ varExpr = new VariableExpr(); VarIdentifier var = SqlppVariableUtil.toInternalVariableIdentifier(nameComponents.second.getValue()); varExpr.setVar(var); } return new DeleteStatement(varExpr, nameComponents.first, nameComponents.second, - condition, getVarCounter(), dataverses, datasets); + condition, getVarCounter()); } } @@ -1738,9 +1727,6 @@ Query ExplainStatement() throws ParseException: Query Query(boolean explain) throws ParseException: { Query query = new Query(explain); - // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked - setDataverses(query.getDataverses()); - setDatasets(query.getDatasets()); Expression expr; } { @@ -1751,9 +1737,6 @@ Query Query(boolean explain) throws ParseException: ) { query.setBody(expr); - // we remove the pointers to the locked entities before we return the query object - setDataverses(null); - setDatasets(null); return query; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 5d44d0b..b2c2e81 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -400,7 +400,7 @@ public class MetadataManager implements IMetadataManager { ARecordType aRecType = (ARecordType) datatype.getDatatype(); return new Datatype( datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(aRecType.getTypeName(), - aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()), + aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()), datatype.getIsAnonymous()); } try { @@ -917,7 +917,7 @@ public class MetadataManager implements IMetadataManager { @Override public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName, - Integer fileNumber) throws MetadataException { + Integer fileNumber) throws MetadataException { ExternalFile file; try { file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber); @@ -985,8 +985,7 @@ public class MetadataManager implements IMetadataManager { @Override public <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx, - IExtensionMetadataSearchKey searchKey) - throws MetadataException { + IExtensionMetadataSearchKey searchKey) throws MetadataException { try { return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey); } catch (RemoteException e) { @@ -1021,14 +1020,14 @@ public class MetadataManager implements IMetadataManager { return; } try { - metadataNode = proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), - TimeUnit.SECONDS); + metadataNode = + proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS); if (metadataNode != null) { rebindMetadataNode = false; } else { throw new HyracksDataException("The MetadataNode failed to bind before the configured timeout (" - + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " + - "configured to run on NC: " + metadataProperties.getMetadataNodeName()); + + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " + + "configured to run on NC: " + metadataProperties.getMetadataNodeName()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index bd1c7d1..8ce53d0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -695,11 +695,10 @@ public interface IMetadataManager extends IMetadataBootstrap { /** * Feed Connection Related Metadata operations */ - void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) - throws MetadataException; + void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) throws MetadataException; - void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, - String datasetName) throws MetadataException; + void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, String datasetName) + throws MetadataException; FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, String datasetName) throws MetadataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java index 0560bd0..f3913e1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java @@ -28,7 +28,7 @@ import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; +import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f8f4a0e..aa76122 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -71,6 +71,8 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; +import org.apache.asterix.metadata.lock.LockList; +import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; @@ -150,6 +152,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private final StorageProperties storageProperties; private final ILibraryManager libraryManager; private final Dataverse defaultDataverse; + private final LockList locks; private MetadataTransactionContext mdTxnCtx; private boolean isWriteTransaction; @@ -160,7 +163,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private ResultSetId resultSetId; private IResultSerializerFactoryProvider resultSerializerFactoryProvider; private JobId jobId; - private Map<String, Integer> locks; + private Map<String, Integer> externalDataLocks; private boolean isTemporaryDatasetWriteJob = true; private boolean blockingOperatorDisabled = false; @@ -171,6 +174,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> libraryManager = AppContextInfo.INSTANCE.getLibraryManager(); metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory(); primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory(); + locks = new LockList(); } public String getPropertyValue(String propertyName) { @@ -280,12 +284,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return storageProperties; } - public Map<String, Integer> getLocks() { - return locks; + public Map<String, Integer> getExternalDataLocks() { + return externalDataLocks; } - public void setLocks(Map<String, Integer> locks) { - this.locks = locks; + public void setExternalDataLocks(Map<String, Integer> locks) { + this.externalDataLocks = locks; } /** @@ -302,6 +306,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> if (dv == null) { return null; } + String fqName = dv + '.' + dataset; + MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, dv); + MetadataLockManager.INSTANCE.acquireDatasetReadLock(locks, fqName); return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset); } @@ -917,7 +924,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { - return SplitsAndConstraintsUtil.getDatasetSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); + return SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, datasetName), mdTxnCtx, + targetIdxName, temp); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) @@ -939,8 +947,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex( String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException { - return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName, - targetIdxName, create); + return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints( + findDataset(dataverseName, datasetName), mdTxnCtx, targetIdxName, create); } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( @@ -1827,16 +1835,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, - searchCallbackFactory, indexName, - prevFieldPermutation, metadataPageManagerFactory); + searchCallbackFactory, indexName, prevFieldPermutation, metadataPageManagerFactory); } else { op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp, indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory, - indexName, - metadataPageManagerFactory); + indexName, metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (Exception e) { @@ -2064,4 +2070,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ds.getDatasetDetails().isTemp()); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } + + public LockList getLocks() { + return locks; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index e0607a6..572130d 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -50,8 +50,8 @@ import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.api.IMetadataEntity; import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.metadata.utils.DatasetUtil; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; import org.apache.asterix.metadata.utils.ExternalIndexingOperations; import org.apache.asterix.metadata.utils.IndexUtil; import org.apache.asterix.metadata.utils.InvertedIndexDataflowHelperFactoryProvider; @@ -283,9 +283,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { // prepare job spec(s) that would disconnect any active feeds involving the dataset. IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { - IDataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), - dataverseName, datasetName); - if (listener.isEntityUsingDataset(ds)) { + if (listener.isEntityUsingDataset(this)) { throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, RecordUtil.toFullyQualifiedName(dataverseName, datasetName), listener.getEntityId().toString()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 45e358f..584aead 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -39,6 +39,7 @@ import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Datatype; @@ -47,7 +48,6 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; @@ -59,17 +59,16 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; */ public class FeedMetadataUtil { - public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx) - throws CompilationException { - Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName); + public static Dataset validateIfDatasetExists(MetadataProvider metadataProvider, String dataverse, + String datasetName, MetadataTransactionContext ctx) throws AlgebricksException { + Dataset dataset = metadataProvider.findDataset(dataverse, datasetName); if (dataset == null) { throw new CompilationException("Unknown target dataset :" + datasetName); } if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) { throw new CompilationException("Statement not applicable. Dataset " + datasetName - + " is not of required type " - + DatasetType.INTERNAL); + + " is not of required type " + DatasetType.INTERNAL); } return dataset; } @@ -87,8 +86,8 @@ public class FeedMetadataUtil { MetadataTransactionContext ctx) throws CompilationException { FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName); if (feedPolicy == null) { - feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME, - policyName); + feedPolicy = + MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME, policyName); if (feedPolicy == null) { throw new CompilationException("Unknown feed policy" + policyName); } @@ -155,7 +154,7 @@ public class FeedMetadataUtil { } } } catch (Exception e) { - throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage() , e); + throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java new file mode 100644 index 0000000..ffcff78 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.asterix.metadata.lock.IMetadataLock.Mode; +import org.apache.asterix.om.base.AMutableInt32; + +public class DatasetLock implements IMetadataLock { + + private ReentrantReadWriteLock dsLock; + private ReentrantReadWriteLock dsModifyLock; + private AMutableInt32 indexBuildCounter; + + public DatasetLock() { + dsLock = new ReentrantReadWriteLock(true); + dsModifyLock = new ReentrantReadWriteLock(true); + indexBuildCounter = new AMutableInt32(0); + } + + private void acquireReadLock() { + // query + // build index + // insert + dsLock.readLock().lock(); + } + + private void releaseReadLock() { + // query + // build index + // insert + dsLock.readLock().unlock(); + } + + private void acquireWriteLock() { + // create ds + // delete ds + // drop index + dsLock.writeLock().lock(); + } + + private void releaseWriteLock() { + // create ds + // delete ds + // drop index + dsLock.writeLock().unlock(); + } + + private void acquireReadModifyLock() { + // insert + dsModifyLock.readLock().lock(); + } + + private void releaseReadModifyLock() { + // insert + dsModifyLock.readLock().unlock(); + } + + private void acquireWriteModifyLock() { + // Build index statement + synchronized (indexBuildCounter) { + if (indexBuildCounter.getIntegerValue() > 0) { + indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1); + } else { + dsModifyLock.writeLock().lock(); + indexBuildCounter.setValue(1); + } + } + } + + private void releaseWriteModifyLock() { + // Build index statement + synchronized (indexBuildCounter) { + if (indexBuildCounter.getIntegerValue() == 1) { + dsModifyLock.writeLock().unlock(); + } + indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1); + } + } + + private void acquireRefreshLock() { + // Refresh External Dataset statement + dsModifyLock.writeLock().lock(); + } + + private void releaseRefreshLock() { + // Refresh External Dataset statement + dsModifyLock.writeLock().unlock(); + } + + @Override + public void acquire(IMetadataLock.Mode mode) { + switch (mode) { + case INDEX_BUILD: + acquireReadLock(); + acquireWriteModifyLock(); + break; + case MODIFY: + acquireReadLock(); + acquireReadModifyLock(); + break; + case REFRESH: + acquireReadLock(); + acquireRefreshLock(); + break; + case INDEX_DROP: + case WRITE: + acquireWriteLock(); + break; + default: + acquireReadLock(); + break; + } + } + + @Override + public void release(IMetadataLock.Mode mode) { + switch (mode) { + case INDEX_BUILD: + releaseWriteModifyLock(); + releaseReadLock(); + break; + case MODIFY: + releaseReadModifyLock(); + releaseReadLock(); + break; + case REFRESH: + releaseRefreshLock(); + releaseReadLock(); + break; + case INDEX_DROP: + case WRITE: + releaseWriteLock(); + break; + default: + releaseReadLock(); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java new file mode 100644 index 0000000..4d8bacf --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.ExternalDatasetAccessManager; + +/** + * This is a singelton class used to maintain the version of each external dataset with indexes + * It should be consolidated once a better global dataset lock management is introduced. + * + * @author alamouda + */ +public class ExternalDatasetsRegistry { + public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry(); + private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister; + + private ExternalDatasetsRegistry() { + globalRegister = new ConcurrentHashMap<>(); + } + + /** + * Get the current version of the dataset + * + * @param dataset + * @return + */ + public int getDatasetVersion(Dataset dataset) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); + if (datasetAccessMgr == null) { + globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); + datasetAccessMgr = globalRegister.get(key); + } + return datasetAccessMgr.getVersion(); + } + + public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider) { + + Map<String, Integer> locks; + String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName(); + // check first if the lock was aquired already + locks = metadataProvider.getExternalDataLocks(); + if (locks == null) { + locks = new HashMap<>(); + metadataProvider.setExternalDataLocks(locks); + } else { + // if dataset was accessed already by this job, return the registered version + Integer version = locks.get(lockKey); + if (version != null) { + return version; + } + } + + ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey); + if (datasetAccessMgr == null) { + globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager()); + datasetAccessMgr = globalRegister.get(lockKey); + } + + // aquire the correct lock + int version = datasetAccessMgr.queryBegin(); + locks.put(lockKey, version); + return version; + } + + public void refreshBegin(Dataset dataset) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); + if (datasetAccessMgr == null) { + datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager()); + } + // aquire the correct lock + datasetAccessMgr.refreshBegin(); + } + + public void removeDatasetInfo(Dataset dataset) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + globalRegister.remove(key); + } + + public void refreshEnd(Dataset dataset, boolean success) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + globalRegister.get(key).refreshEnd(success); + } + + public void buildIndexBegin(Dataset dataset, boolean firstIndex) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); + if (datasetAccessMgr == null) { + globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); + datasetAccessMgr = globalRegister.get(key); + } + // aquire the correct lock + datasetAccessMgr.buildIndexBegin(firstIndex); + } + + public void buildIndexEnd(Dataset dataset, boolean firstIndex) { + String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); + globalRegister.get(key).buildIndexEnd(firstIndex); + } + + public void releaseAcquiredLocks(MetadataProvider metadataProvider) { + Map<String, Integer> locks = metadataProvider.getExternalDataLocks(); + if (locks == null) { + return; + } else { + // if dataset was accessed already by this job, return the registered version + Set<Entry<String, Integer>> aquiredLocks = locks.entrySet(); + for (Entry<String, Integer> entry : aquiredLocks) { + ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey()); + if (accessManager != null) { + accessManager.queryEnd(entry.getValue()); + } + } + locks.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java new file mode 100644 index 0000000..8af29ba --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +/** + * A Metadata lock local to compilation node + */ +public interface IMetadataLock { + + enum Mode { + READ, + MODIFY, + REFRESH, + INDEX_BUILD, + INDEX_DROP, + WRITE + } + + /** + * Acquire a lock + * + * @param mode + * lock mode + */ + void acquire(IMetadataLock.Mode mode); + + /** + * Release a lock + * + * @param mode + * lock mode + */ + void release(IMetadataLock.Mode mode); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java new file mode 100644 index 0000000..6e6f086 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.metadata.lock.IMetadataLock.Mode; +import org.apache.commons.lang3.tuple.Pair; + +public class LockList { + List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new ArrayList<>(); + + public void add(IMetadataLock.Mode mode, IMetadataLock lock) { + lock.acquire(mode); + locks.add(Pair.of(mode, lock)); + } + + public void unlock() { + for (int i = locks.size() - 1; i >= 0; i--) { + Pair<IMetadataLock.Mode, IMetadataLock> pair = locks.get(i); + pair.getRight().release(pair.getLeft()); + } + } + + public void reset() { + locks.clear(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java new file mode 100644 index 0000000..c8fd64f --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.asterix.metadata.lock.IMetadataLock.Mode; + +public class MetadataLock implements IMetadataLock { + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + @Override + public void acquire(IMetadataLock.Mode mode) { + switch (mode) { + case WRITE: + lock.writeLock().lock(); + break; + default: + lock.readLock().lock(); + break; + } + } + + @Override + public void release(IMetadataLock.Mode mode) { + switch (mode) { + case WRITE: + lock.writeLock().unlock(); + break; + default: + lock.readLock().unlock(); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java new file mode 100644 index 0000000..5ae3aa4 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.lock; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.metadata.utils.DatasetUtil; + +public class MetadataLockManager { + + public static final MetadataLockManager INSTANCE = new MetadataLockManager(); + private static final Function<String, MetadataLock> LOCK_FUNCTION = key -> new MetadataLock(); + private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock(); + + private final ConcurrentHashMap<String, MetadataLock> dataversesLocks; + private final ConcurrentHashMap<String, DatasetLock> datasetsLocks; + private final ConcurrentHashMap<String, MetadataLock> functionsLocks; + private final ConcurrentHashMap<String, MetadataLock> nodeGroupsLocks; + private final ConcurrentHashMap<String, MetadataLock> feedsLocks; + private final ConcurrentHashMap<String, MetadataLock> feedPolicyLocks; + private final ConcurrentHashMap<String, MetadataLock> compactionPolicyLocks; + private final ConcurrentHashMap<String, MetadataLock> dataTypeLocks; + private final ConcurrentHashMap<String, MetadataLock> extensionLocks; + + private MetadataLockManager() { + dataversesLocks = new ConcurrentHashMap<>(); + datasetsLocks = new ConcurrentHashMap<>(); + functionsLocks = new ConcurrentHashMap<>(); + nodeGroupsLocks = new ConcurrentHashMap<>(); + feedsLocks = new ConcurrentHashMap<>(); + feedPolicyLocks = new ConcurrentHashMap<>(); + compactionPolicyLocks = new ConcurrentHashMap<>(); + dataTypeLocks = new ConcurrentHashMap<>(); + extensionLocks = new ConcurrentHashMap<>(); + } + + // Dataverse + public void acquireDataverseReadLock(LockList locks, String dataverseName) { + MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireDataverseWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // Dataset + public void acquireDatasetReadLock(LockList locks, String datasetName) { + DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireDatasetWriteLock(LockList locks, String datasetName) { + DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + public void acquireDatasetModifyLock(LockList locks, String datasetName) { + DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.MODIFY, lock); + } + + public void acquireDatasetCreateIndexLock(LockList locks, String datasetName) { + DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.INDEX_BUILD, lock); + } + + public void acquireExternalDatasetRefreshLock(LockList locks, String datasetName) { + DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.INDEX_BUILD, lock); + } + + // Function + public void acquireFunctionReadLock(LockList locks, String dataverseName) { + MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireFunctionWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // Node Group + public void acquireNodeGroupReadLock(LockList locks, String dataverseName) { + MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireNodeGroupWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // Feeds + public void acquireFeedReadLock(LockList locks, String dataverseName) { + MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireFeedWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // Feed Policies + public void acquireFeedPolicyReadLock(LockList locks, String dataverseName) { + MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireFeedPolicyWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // CompactionPolicy + public void acquireCompactionPolicyReadLock(LockList locks, String dataverseName) { + MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireCompactionPolicyWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // DataType + public void acquireDataTypeReadLock(LockList locks, String dataverseName) { + MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireDataTypeWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + // Extensions + public void acquireExtensionReadLock(LockList locks, String dataverseName) { + MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.READ, lock); + } + + public void acquireExtensionWriteLock(LockList locks, String dataverseName) { + MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); + locks.add(IMetadataLock.Mode.WRITE, lock); + } + + public void createDatasetBegin(LockList locks, String dataverseName, String itemTypeDataverseName, + String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, + String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName, + boolean isDefaultCompactionPolicy) { + acquireDataverseReadLock(locks, dataverseName); + if (!dataverseName.equals(itemTypeDataverseName)) { + acquireDataverseReadLock(locks, itemTypeDataverseName); + } + if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName) + && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) { + acquireDataverseReadLock(locks, metaItemTypeDataverseName); + } + acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName); + if (metaItemTypeFullyQualifiedName != null + && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) { + acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName); + } + acquireNodeGroupReadLock(locks, nodeGroupName); + if (!isDefaultCompactionPolicy) { + acquireCompactionPolicyReadLock(locks, compactionPolicyName); + } + acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public void createIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName); + } + + public void createTypeBegin(LockList locks, String dataverseName, String itemTypeFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName); + } + + public void dropDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public void dropIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public void dropTypeBegin(LockList locks, String dataverseName, String dataTypeFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName); + } + + public void functionStatementBegin(LockList locks, String dataverseName, String functionFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireFunctionWriteLock(locks, functionFullyQualifiedName); + } + + public void modifyDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetModifyLock(locks, datasetFullyQualifiedName); + } + + public void insertDeleteUpsertBegin(LockList locks, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, DatasetUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName)); + acquireDatasetModifyLock(locks, datasetFullyQualifiedName); + } + + public void dropFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireFeedWriteLock(locks, feedFullyQualifiedName); + } + + public void dropFeedPolicyBegin(LockList locks, String dataverseName, String policyName) { + acquireFeedWriteLock(locks, policyName); + acquireDataverseReadLock(locks, dataverseName); + } + + public void startFeedBegin(LockList locks, String dataverseName, String feedName, + List<FeedConnection> feedConnections) { + acquireDataverseReadLock(locks, dataverseName); + acquireFeedReadLock(locks, feedName); + for (FeedConnection feedConnection : feedConnections) { + // what if the dataset is in a different dataverse + String fqName = dataverseName + "." + feedConnection.getDatasetName(); + acquireDatasetReadLock(locks, fqName); + } + } + + public void stopFeedBegin(LockList locks, String dataverseName, String feedName) { + // TODO: dataset lock? + // Dataset locks are not required here since datasets are protected by the active event listener + acquireDataverseReadLock(locks, dataverseName); + acquireFeedReadLock(locks, feedName); + } + + public void createFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireFeedWriteLock(locks, feedFullyQualifiedName); + } + + public void connectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName, + String feedFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetReadLock(locks, datasetFullyQualifiedName); + acquireFeedReadLock(locks, feedFullyQualifiedName); + } + + public void createFeedPolicyBegin(LockList locks, String dataverseName, String policyName) { + acquireDataverseReadLock(locks, dataverseName); + acquireFeedPolicyWriteLock(locks, policyName); + } + + public void disconnectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName, + String feedFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetReadLock(locks, datasetFullyQualifiedName); + acquireFeedReadLock(locks, feedFullyQualifiedName); + } + + public void compactBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireDatasetReadLock(locks, datasetFullyQualifiedName); + } + + public void refreshDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) { + acquireDataverseReadLock(locks, dataverseName); + acquireExternalDatasetRefreshLock(locks, datasetFullyQualifiedName); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java deleted file mode 100644 index d53191c..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.utils; - -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.asterix.om.base.AMutableInt32; - -public class DatasetLock { - - private ReentrantReadWriteLock dsLock; - private ReentrantReadWriteLock dsModifyLock; - private AMutableInt32 indexBuildCounter; - - public DatasetLock() { - dsLock = new ReentrantReadWriteLock(true); - dsModifyLock = new ReentrantReadWriteLock(true); - indexBuildCounter = new AMutableInt32(0); - } - - public void acquireReadLock() { - // query - // build index - // insert - dsLock.readLock().lock(); - } - - public void releaseReadLock() { - // query - // build index - // insert - dsLock.readLock().unlock(); - } - - public void acquireWriteLock() { - // create ds - // delete ds - // drop index - dsLock.writeLock().lock(); - } - - public void releaseWriteLock() { - // create ds - // delete ds - // drop index - dsLock.writeLock().unlock(); - } - - public void acquireReadModifyLock() { - // insert - dsModifyLock.readLock().lock(); - } - - public void releaseReadModifyLock() { - // insert - dsModifyLock.readLock().unlock(); - } - - public void acquireWriteModifyLock() { - // Build index statement - synchronized (indexBuildCounter) { - if (indexBuildCounter.getIntegerValue() > 0) { - indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1); - } else { - dsModifyLock.writeLock().lock(); - indexBuildCounter.setValue(1); - } - } - } - - public void releaseWriteModifyLock() { - // Build index statement - synchronized (indexBuildCounter) { - if (indexBuildCounter.getIntegerValue() == 1) { - dsModifyLock.writeLock().unlock(); - } - indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1); - } - } - - public void acquireRefreshLock() { - // Refresh External Dataset statement - dsModifyLock.writeLock().lock(); - } - - public void releaseRefreshLock() { - // Refresh External Dataset statement - dsModifyLock.writeLock().unlock(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 572cc75..ca56cc3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -94,8 +94,8 @@ public class DatasetUtil { private DatasetUtil() { } - public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, - ARecordType itemType, ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) + public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType, + ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException { List<List<String>> partitioningKeys = getPartitioningKeys(dataset); IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()]; @@ -181,8 +181,7 @@ public class DatasetUtil { public static List<List<String>> getPartitioningKeys(Dataset dataset) { if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return IndexingConstants - .getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); + return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); } return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey(); } @@ -276,8 +275,7 @@ public class DatasetUtil { String compactionPolicyFactoryClassName = compactionPolicy.getClassName(); ILSMMergePolicyFactory mergePolicyFactory; try { - mergePolicyFactory = - (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); + mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId()); } @@ -338,8 +336,8 @@ public class DatasetUtil { metadataProvider.getSplitProviderAndConstraints(dataset); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( - metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(), @@ -393,8 +391,8 @@ public class DatasetUtil { Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( - metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(), @@ -467,8 +465,8 @@ public class DatasetUtil { index, itemType, metaItemType, compactionInfo.first, compactionInfo.second); TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, - dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, + splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, + localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, storageComponentProvider.getMetadataPageManagerFactory()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, splitsAndConstraint.second); @@ -518,4 +516,13 @@ public class DatasetUtil { spec.addRoot(compactOp); return spec; } + + public static boolean isFullyQualifiedName(String datasetName) { + return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a . + } + + public static String getDataverseFromFullyQualifiedName(String datasetName) { + int idx = datasetName.indexOf('.'); + return datasetName.substring(0, idx); + } }
