Repository: asterixdb Updated Branches: refs/heads/master ff915a9ec -> 6eb0175f9
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java deleted file mode 100644 index 2e35ed4..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.utils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; - -/** - * This is a singelton class used to maintain the version of each external dataset with indexes - * It should be consolidated once a better global dataset lock management is introduced. - * - * @author alamouda - */ -public class ExternalDatasetsRegistry { - public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry(); - private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister; - - private ExternalDatasetsRegistry() { - globalRegister = new ConcurrentHashMap<>(); - } - - /** - * Get the current version of the dataset - * - * @param dataset - * @return - */ - public int getDatasetVersion(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(key); - } - return datasetAccessMgr.getVersion(); - } - - public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider) { - - Map<String, Integer> locks; - String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName(); - // check first if the lock was aquired already - locks = metadataProvider.getLocks(); - if (locks == null) { - locks = new HashMap<>(); - metadataProvider.setLocks(locks); - } else { - // if dataset was accessed already by this job, return the registered version - Integer version = locks.get(lockKey); - if (version != null) { - return version; - } - } - - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(lockKey); - } - - // aquire the correct lock - int version = datasetAccessMgr.queryBegin(); - locks.put(lockKey, version); - return version; - } - - public void refreshBegin(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager()); - } - // aquire the correct lock - datasetAccessMgr.refreshBegin(); - } - - public void removeDatasetInfo(Dataset dataset) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.remove(key); - } - - public void refreshEnd(Dataset dataset, boolean success) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.get(key).refreshEnd(success); - } - - public void buildIndexBegin(Dataset dataset, boolean firstIndex) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key); - if (datasetAccessMgr == null) { - globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager()); - datasetAccessMgr = globalRegister.get(key); - } - // aquire the correct lock - datasetAccessMgr.buildIndexBegin(firstIndex); - } - - public void buildIndexEnd(Dataset dataset, boolean firstIndex) { - String key = dataset.getDataverseName() + "." + dataset.getDatasetName(); - globalRegister.get(key).buildIndexEnd(firstIndex); - } - - public void releaseAcquiredLocks(MetadataProvider metadataProvider) { - Map<String, Integer> locks = metadataProvider.getLocks(); - if (locks == null) { - return; - } else { - // if dataset was accessed already by this job, return the registered version - Set<Entry<String, Integer>> aquiredLocks = locks.entrySet(); - for (Entry<String, Integer> entry : aquiredLocks) { - ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey()); - if (accessManager != null) { - accessManager.queryEnd(entry.getValue()); - } - } - locks.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java deleted file mode 100644 index 9292008..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java +++ /dev/null @@ -1,587 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.utils; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; - -import org.apache.asterix.metadata.entities.Dataverse; -import org.apache.asterix.metadata.entities.FeedConnection; - -public class MetadataLockManager { - - public static final MetadataLockManager INSTANCE = new MetadataLockManager(); - private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key -> new ReentrantReadWriteLock(); - private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock(); - private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks; - private final ConcurrentHashMap<String, DatasetLock> datasetsLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks; - private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks; - - private MetadataLockManager() { - dataversesLocks = new ConcurrentHashMap<>(); - datasetsLocks = new ConcurrentHashMap<>(); - functionsLocks = new ConcurrentHashMap<>(); - nodeGroupsLocks = new ConcurrentHashMap<>(); - feedsLocks = new ConcurrentHashMap<>(); - feedPolicyLocks = new ConcurrentHashMap<>(); - compactionPolicyLocks = new ConcurrentHashMap<>(); - dataTypeLocks = new ConcurrentHashMap<>(); - extensionLocks = new ConcurrentHashMap<>(); - } - - public void acquireDataverseReadLock(String dataverseName) { - ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); - dvLock.readLock().lock(); - } - - public void releaseDataverseReadLock(String dataverseName) { - dataversesLocks.get(dataverseName).readLock().unlock(); - } - - public void acquireDataverseWriteLock(String dataverseName) { - ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION); - dvLock.writeLock().lock(); - } - - public void releaseDataverseWriteLock(String dataverseName) { - dataversesLocks.get(dataverseName).writeLock().unlock(); - } - - public void acquireDatasetReadLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); - dsLock.acquireReadLock(); - } - - public void releaseDatasetReadLock(String datasetName) { - datasetsLocks.get(datasetName).releaseReadLock(); - } - - public void acquireDatasetWriteLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); - dsLock.acquireWriteLock(); - } - - public void releaseDatasetWriteLock(String datasetName) { - datasetsLocks.get(datasetName).releaseWriteLock(); - } - - public void acquireDatasetModifyLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); - dsLock.acquireReadLock(); - dsLock.acquireReadModifyLock(); - } - - public void releaseDatasetModifyLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.get(datasetName); - dsLock.releaseReadModifyLock(); - dsLock.releaseReadLock(); - } - - public void acquireDatasetCreateIndexLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); - dsLock.acquireReadLock(); - dsLock.acquireWriteModifyLock(); - } - - public void releaseDatasetCreateIndexLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.get(datasetName); - dsLock.releaseWriteModifyLock(); - dsLock.releaseReadLock(); - } - - public void acquireExternalDatasetRefreshLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION); - dsLock.acquireReadLock(); - dsLock.acquireRefreshLock(); - } - - public void releaseExternalDatasetRefreshLock(String datasetName) { - DatasetLock dsLock = datasetsLocks.get(datasetName); - dsLock.releaseRefreshLock(); - dsLock.releaseReadLock(); - } - - public void acquireFunctionReadLock(String functionName) { - ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION); - fLock.readLock().lock(); - } - - public void releaseFunctionReadLock(String functionName) { - functionsLocks.get(functionName).readLock().unlock(); - } - - public void acquireFunctionWriteLock(String functionName) { - ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION); - fLock.writeLock().lock(); - } - - public void releaseFunctionWriteLock(String functionName) { - functionsLocks.get(functionName).writeLock().unlock(); - } - - public void acquireNodeGroupReadLock(String nodeGroupName) { - ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION); - ngLock.readLock().lock(); - } - - public void releaseNodeGroupReadLock(String nodeGroupName) { - nodeGroupsLocks.get(nodeGroupName).readLock().unlock(); - } - - public void acquireNodeGroupWriteLock(String nodeGroupName) { - ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION); - ngLock.writeLock().lock(); - } - - public void releaseNodeGroupWriteLock(String nodeGroupName) { - nodeGroupsLocks.get(nodeGroupName).writeLock().unlock(); - } - - public void acquireFeedReadLock(String feedName) { - ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION); - fLock.readLock().lock(); - } - - public void releaseFeedReadLock(String feedName) { - feedsLocks.get(feedName).readLock().unlock(); - } - - public void acquireFeedWriteLock(String feedName) { - ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION); - fLock.writeLock().lock(); - } - - public void releaseFeedWriteLock(String feedName) { - feedsLocks.get(feedName).writeLock().unlock(); - } - - public void acquireFeedPolicyWriteLock(String policyName) { - ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION); - fLock.writeLock().lock(); - } - - public void releaseFeedPolicyWriteLock(String policyName) { - feedPolicyLocks.get(policyName).writeLock().unlock(); - } - - public void acquireCompactionPolicyReadLock(String compactionPolicyName) { - ReentrantReadWriteLock compactionPolicyLock = - compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION); - compactionPolicyLock.readLock().lock(); - } - - public void releaseCompactionPolicyReadLock(String compactionPolicyName) { - compactionPolicyLocks.get(compactionPolicyName).readLock().unlock(); - } - - public void acquireCompactionPolicyWriteLock(String compactionPolicyName) { - ReentrantReadWriteLock compactionPolicyLock = - compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION); - compactionPolicyLock.writeLock().lock(); - } - - public void releaseCompactionPolicyWriteLock(String compactionPolicyName) { - compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock(); - } - - public void acquireDataTypeReadLock(String dataTypeName) { - ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION); - dataTypeLock.readLock().lock(); - } - - public void releaseDataTypeReadLock(String dataTypeName) { - dataTypeLocks.get(dataTypeName).readLock().unlock(); - } - - public void acquireDataTypeWriteLock(String dataTypeName) { - ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION); - dataTypeLock.writeLock().lock(); - } - - public void releaseDataTypeWriteLock(String dataTypeName) { - dataTypeLocks.get(dataTypeName).writeLock().unlock(); - } - - public void createDatasetBegin(String dataverseName, String itemTypeDataverseName, - String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, - String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName, - boolean isDefaultCompactionPolicy) { - acquireDataverseReadLock(dataverseName); - if (!dataverseName.equals(itemTypeDataverseName)) { - acquireDataverseReadLock(itemTypeDataverseName); - } - if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName) - && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) { - acquireDataverseReadLock(metaItemTypeDataverseName); - } - acquireDataTypeReadLock(itemTypeFullyQualifiedName); - if (metaItemTypeFullyQualifiedName != null - && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) { - acquireDataTypeReadLock(metaItemTypeFullyQualifiedName); - } - acquireNodeGroupReadLock(nodeGroupName); - if (!isDefaultCompactionPolicy) { - acquireCompactionPolicyReadLock(compactionPolicyName); - } - acquireDatasetWriteLock(datasetFullyQualifiedName); - } - - public void createDatasetEnd(String dataverseName, String itemTypeDataverseName, String itemTypeFullyQualifiedName, - String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, String nodeGroupName, - String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) { - releaseDatasetWriteLock(datasetFullyQualifiedName); - if (!isDefaultCompactionPolicy) { - releaseCompactionPolicyReadLock(compactionPolicyName); - } - releaseNodeGroupReadLock(nodeGroupName); - if (metaItemTypeFullyQualifiedName != null - && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) { - releaseDataTypeReadLock(metaItemTypeFullyQualifiedName); - } - releaseDataTypeReadLock(itemTypeFullyQualifiedName); - if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName) - && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) { - releaseDataverseReadLock(metaItemTypeDataverseName); - } - if (!dataverseName.equals(itemTypeDataverseName)) { - releaseDataverseReadLock(itemTypeDataverseName); - } - releaseDataverseReadLock(dataverseName); - } - - public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetCreateIndexLock(datasetFullyQualifiedName); - } - - public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseDatasetCreateIndexLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDataTypeWriteLock(itemTypeFullyQualifiedName); - } - - public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) { - releaseDataTypeWriteLock(itemTypeFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetWriteLock(datasetFullyQualifiedName); - } - - public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseDatasetWriteLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetWriteLock(datasetFullyQualifiedName); - } - - public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseDatasetWriteLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDataTypeWriteLock(dataTypeFullyQualifiedName); - } - - public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) { - releaseDataTypeWriteLock(dataTypeFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireFunctionWriteLock(functionFullyQualifiedName); - } - - public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName) { - releaseFunctionWriteLock(functionFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetModifyLock(datasetFullyQualifiedName); - } - - public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseDatasetModifyLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses, - List<String> datasets) { - dataverses.add(dataverseName); - datasets.add(datasetFullyQualifiedName); - Collections.sort(dataverses); - Collections.sort(datasets); - - String previous = null; - for (int i = 0; i < dataverses.size(); i++) { - String current = dataverses.get(i); - if (!current.equals(previous)) { - acquireDataverseReadLock(current); - previous = current; - } - } - - for (int i = 0; i < datasets.size(); i++) { - String current = datasets.get(i); - if (!current.equals(previous)) { - if (current.equals(datasetFullyQualifiedName)) { - acquireDatasetModifyLock(current); - } else { - acquireDatasetReadLock(current); - } - previous = current; - } - } - } - - public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses, - List<String> datasets) { - String previous = null; - for (int i = dataverses.size() - 1; i >= 0; i--) { - String current = dataverses.get(i); - if (!current.equals(previous)) { - releaseDataverseReadLock(current); - previous = current; - } - } - for (int i = datasets.size() - 1; i >= 0; i--) { - String current = datasets.get(i); - if (!current.equals(previous)) { - if (current.equals(datasetFullyQualifiedName)) { - releaseDatasetModifyLock(current); - } else { - releaseDatasetReadLock(current); - } - previous = current; - } - } - } - - public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireFeedWriteLock(feedFullyQualifiedName); - } - - public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) { - releaseFeedWriteLock(feedFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void dropFeedPolicyBegin(String dataverseName, String policyName) { - acquireFeedWriteLock(policyName); - acquireDataverseReadLock(dataverseName); - } - - public void dropFeedPolicyEnd(String dataverseName, String policyName) { - releaseFeedWriteLock(policyName); - releaseDataverseReadLock(dataverseName); - } - - public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection> feedConnections) { - acquireDataverseReadLock(dataverseName); - acquireFeedReadLock(feedName); - for (FeedConnection feedConnection : feedConnections) { - acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName()); - } - } - - public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection> feedConnections) { - releaseDataverseReadLock(dataverseName); - releaseFeedReadLock(feedName); - for (FeedConnection feedConnection : feedConnections) { - releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName()); - } - } - - public void StopFeedBegin(String dataverseName, String feedName) { - // TODO: dataset lock? - acquireDataverseReadLock(dataverseName); - acquireFeedReadLock(feedName); - } - - public void StopFeedEnd(String dataverseName, String feedName) { - releaseDataverseReadLock(dataverseName); - releaseFeedReadLock(feedName); - } - - public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireFeedWriteLock(feedFullyQualifiedName); - } - - public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) { - releaseFeedWriteLock(feedFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetReadLock(datasetFullyQualifiedName); - acquireFeedReadLock(feedFullyQualifiedName); - } - - public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) { - releaseFeedReadLock(feedFullyQualifiedName); - releaseDatasetReadLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void createFeedPolicyBegin(String dataverseName, String policyName) { - acquireDataverseReadLock(dataverseName); - acquireFeedPolicyWriteLock(policyName); - } - - public void createFeedPolicyEnd(String dataverseName, String policyName) { - releaseFeedPolicyWriteLock(policyName); - releaseDataverseReadLock(dataverseName); - } - - public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetReadLock(datasetFullyQualifiedName); - acquireFeedReadLock(feedFullyQualifiedName); - } - - public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) { - releaseFeedReadLock(feedFullyQualifiedName); - releaseDatasetReadLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetReadLock(datasetFullyQualifiedName); - acquireFeedReadLock(feedFullyQualifiedName); - } - - public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) { - releaseFeedReadLock(feedFullyQualifiedName); - releaseDatasetReadLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void compactBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireDatasetReadLock(datasetFullyQualifiedName); - } - - public void compactEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseDatasetReadLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String> datasets) { - if (dataverse != null) { - dataverses.add(dataverse.getDataverseName()); - } - Collections.sort(dataverses); - Collections.sort(datasets); - - String previous = null; - for (int i = 0; i < dataverses.size(); i++) { - String current = dataverses.get(i); - if (!current.equals(previous)) { - acquireDataverseReadLock(current); - previous = current; - } - } - - for (int i = 0; i < datasets.size(); i++) { - String current = datasets.get(i); - if (!current.equals(previous)) { - acquireDatasetReadLock(current); - previous = current; - } - } - } - - public void queryEnd(List<String> dataverses, List<String> datasets) { - String previous = null; - for (int i = dataverses.size() - 1; i >= 0; i--) { - String current = dataverses.get(i); - if (!current.equals(previous)) { - releaseDataverseReadLock(current); - previous = current; - } - } - for (int i = datasets.size() - 1; i >= 0; i--) { - String current = datasets.get(i); - if (!current.equals(previous)) { - releaseDatasetReadLock(current); - previous = current; - } - } - } - - public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName) { - acquireDataverseReadLock(dataverseName); - acquireExternalDatasetRefreshLock(datasetFullyQualifiedName); - } - - public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName) { - releaseExternalDatasetRefreshLock(datasetFullyQualifiedName); - releaseDataverseReadLock(dataverseName); - } - - public void acquireExtensionReadLock(String entityName) { - ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION); - entityLock.readLock().lock(); - } - - public void releaseExtensionReadLock(String entityName) { - extensionLocks.get(entityName).readLock().unlock(); - } - - public void acquireExtensionWriteLock(String entityName) { - ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION); - entityLock.writeLock().lock(); - } - - public void releaseExtensionWriteLock(String entityName) { - extensionLocks.get(entityName).writeLock().unlock(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java index 8859b9d..45034de 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java @@ -32,6 +32,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6eb0175f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index 35e7acb..a4b849f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -48,19 +48,19 @@ public class SplitsAndConstraintsUtil { ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons(); String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); for (int j = 0; j < clusterPartition.length; j++) { - File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName, - clusterPartition[j].getPartitionId()) + File.separator + relPathFile); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath())); + File f = new File( + StoragePathUtil.prepareStoragePartitionPath(storageDirName, clusterPartition[j].getPartitionId()) + + File.separator + relPathFile); + splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath())); } return splits.toArray(new FileSplit[] {}); } - public static FileSplit[] getDatasetSplits(MetadataTransactionContext mdTxnCtx, String dataverseName, - String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { + public static FileSplit[] getDatasetSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx, + String targetIdxName, boolean temp) throws AlgebricksException { try { - File relPathFile = - new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName)); - Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); + File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), + dataset.getDatasetName(), targetIdxName)); List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); if (nodeGroup == null) { @@ -92,12 +92,11 @@ public class SplitsAndConstraintsUtil { } } - private static FileSplit[] getFilesIndexSplits(MetadataTransactionContext mdTxnCtx, String dataverseName, - String datasetName, String targetIdxName, boolean create) throws AlgebricksException { + private static FileSplit[] getFilesIndexSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx, + String targetIdxName, boolean create) throws AlgebricksException { try { - File relPathFile = - new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName)); - Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); + File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(), + dataset.getDatasetName(), targetIdxName)); List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); if (nodeGroup == null) { @@ -114,14 +113,14 @@ public class SplitsAndConstraintsUtil { // Only the first partition when create File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f - .getPath())); + splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], + f.getPath())); } else { for (int k = 0; k < nodePartitions.length; k++) { File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile); - splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f - .getPath())); + splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], + f.getPath())); } } } @@ -138,9 +137,9 @@ public class SplitsAndConstraintsUtil { } public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getFilesIndexSplitProviderAndConstraints( - MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName, - boolean create) throws AlgebricksException { - FileSplit[] splits = getFilesIndexSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, create); + Dataset dataset, MetadataTransactionContext mdTxnCtx, String targetIdxName, boolean create) + throws AlgebricksException { + FileSplit[] splits = getFilesIndexSplits(dataset, mdTxnCtx, targetIdxName, create); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); }
