http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java index 6c20e9f..48f315b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java @@ -21,6 +21,8 @@ package org.apache.asterix.metadata.lock; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.asterix.common.metadata.IMetadataLock; + public class MetadataLock implements IMetadataLock { private final String key; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -30,7 +32,7 @@ public class MetadataLock implements IMetadataLock { } @Override - public void acquire(IMetadataLock.Mode mode) { + public void lock(IMetadataLock.Mode mode) { switch (mode) { case WRITE: lock.writeLock().lock(); @@ -42,7 +44,7 @@ public class MetadataLock implements IMetadataLock { } @Override - public void release(IMetadataLock.Mode mode) { + public void unlock(IMetadataLock.Mode mode) { switch (mode) { case WRITE: lock.writeLock().unlock(); @@ -73,4 +75,9 @@ public class MetadataLock implements IMetadataLock { } return Objects.equals(key, ((MetadataLock) o).key); } + + @Override + public String toString() { + return key; + } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java index 43d72e3..779fe2a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java @@ -18,17 +18,17 @@ */ package org.apache.asterix.metadata.lock; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.metadata.entities.FeedConnection; -import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.common.metadata.IMetadataLock; +import org.apache.asterix.common.metadata.LockList; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -public class MetadataLockManager { +public class MetadataLockManager implements IMetadataLockManager { - public static final MetadataLockManager INSTANCE = new MetadataLockManager(); private static final Function<String, MetadataLock> LOCK_FUNCTION = MetadataLock::new; private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = DatasetLock::new; @@ -38,286 +38,175 @@ public class MetadataLockManager { private static final String DATASET_PREFIX = "Dataset:"; private static final String FUNCTION_PREFIX = "Function:"; private static final String NODE_GROUP_PREFIX = "NodeGroup:"; - private static final String FEED_PREFIX = "Feed:"; + private static final String ACTIVE_PREFIX = "Active:"; private static final String FEED_POLICY_PREFIX = "FeedPolicy:"; private static final String MERGE_POLICY_PREFIX = "MergePolicy:"; private static final String DATATYPE_PREFIX = "DataType:"; private static final String EXTENSION_PREFIX = "Extension:"; - private MetadataLockManager() { + public MetadataLockManager() { mdlocks = new ConcurrentHashMap<>(); } - // Dataverse + @Override public void acquireDataverseReadLock(LockList locks, String dataverseName) throws AsterixException { String key = DATAVERSE_PREFIX + dataverseName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireDataverseWriteLock(LockList locks, String dataverseName) throws AsterixException { String key = DATAVERSE_PREFIX + dataverseName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - // Dataset + @Override public void acquireDatasetReadLock(LockList locks, String datasetName) throws AsterixException { String key = DATASET_PREFIX + datasetName; DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireDatasetWriteLock(LockList locks, String datasetName) throws AsterixException { String key = DATASET_PREFIX + datasetName; DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } + @Override public void acquireDatasetModifyLock(LockList locks, String datasetName) throws AsterixException { String key = DATASET_PREFIX + datasetName; DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); locks.add(IMetadataLock.Mode.MODIFY, lock); } + @Override public void acquireDatasetCreateIndexLock(LockList locks, String datasetName) throws AsterixException { - String key = DATASET_PREFIX + datasetName; - DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); + String dsKey = DATASET_PREFIX + datasetName; + DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(dsKey, DATASET_LOCK_FUNCTION); locks.add(IMetadataLock.Mode.INDEX_BUILD, lock); } - public void acquireExternalDatasetRefreshLock(LockList locks, String datasetName) throws AsterixException { + @Override + public void acquireDatasetExclusiveModificationLock(LockList locks, String datasetName) throws AsterixException { String key = DATASET_PREFIX + datasetName; DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); - locks.add(IMetadataLock.Mode.INDEX_BUILD, lock); + locks.add(IMetadataLock.Mode.EXCLUSIVE_MODIFY, lock); } - // Function + @Override public void acquireFunctionReadLock(LockList locks, String functionName) throws AsterixException { String key = FUNCTION_PREFIX + functionName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireFunctionWriteLock(LockList locks, String functionName) throws AsterixException { String key = FUNCTION_PREFIX + functionName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - // Node Group + @Override public void acquireNodeGroupReadLock(LockList locks, String nodeGroupName) throws AsterixException { String key = NODE_GROUP_PREFIX + nodeGroupName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireNodeGroupWriteLock(LockList locks, String nodeGroupName) throws AsterixException { String key = NODE_GROUP_PREFIX + nodeGroupName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - // Feeds - public void acquireFeedReadLock(LockList locks, String feedName) throws AsterixException { - String key = FEED_PREFIX + feedName; + @Override + public void acquireActiveEntityReadLock(LockList locks, String entityName) throws AsterixException { + String key = ACTIVE_PREFIX + entityName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } - public void acquireFeedWriteLock(LockList locks, String feedName) throws AsterixException { - String key = FEED_PREFIX + feedName; + @Override + public void acquireActiveEntityWriteLock(LockList locks, String entityName) throws AsterixException { + String key = ACTIVE_PREFIX + entityName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } + @Override public void acquireFeedPolicyWriteLock(LockList locks, String feedPolicyName) throws AsterixException { String key = FEED_POLICY_PREFIX + feedPolicyName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } + @Override public void acquireFeedPolicyReadLock(LockList locks, String feedPolicyName) throws AsterixException { String key = FEED_POLICY_PREFIX + feedPolicyName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } - // CompactionPolicy + @Override public void acquireMergePolicyReadLock(LockList locks, String mergePolicyName) throws AsterixException { String key = MERGE_POLICY_PREFIX + mergePolicyName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireMergePolicyWriteLock(LockList locks, String mergePolicyName) throws AsterixException { String key = MERGE_POLICY_PREFIX + mergePolicyName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - // DataType + @Override public void acquireDataTypeReadLock(LockList locks, String datatypeName) throws AsterixException { String key = DATATYPE_PREFIX + datatypeName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } + @Override public void acquireDataTypeWriteLock(LockList locks, String datatypeName) throws AsterixException { String key = DATATYPE_PREFIX + datatypeName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - // Extensions - public void acquireExtensionReadLock(LockList locks, String extensionName) throws AsterixException { - String key = EXTENSION_PREFIX + extensionName; + @Override + public void acquireExtensionReadLock(LockList locks, String extension, String entityName) throws AsterixException { + String key = EXTENSION_PREFIX + extension + entityName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.READ, lock); } - public void acquireExtensionWriteLock(LockList locks, String extensionName) throws AsterixException { - String key = EXTENSION_PREFIX + extensionName; + @Override + public void acquireExtensionWriteLock(LockList locks, String extension, String entityName) throws AsterixException { + String key = EXTENSION_PREFIX + extension + entityName; IMetadataLock lock = mdlocks.computeIfAbsent(key, LOCK_FUNCTION); locks.add(IMetadataLock.Mode.WRITE, lock); } - public void createDatasetBegin(LockList locks, String dataverseName, String itemTypeDataverseName, - String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, - String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName, - boolean isDefaultCompactionPolicy) throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - if (!dataverseName.equals(itemTypeDataverseName)) { - acquireDataverseReadLock(locks, itemTypeDataverseName); - } - if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName) - && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) { - acquireDataverseReadLock(locks, metaItemTypeDataverseName); - } - acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName); - if (metaItemTypeFullyQualifiedName != null - && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) { - acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName); - } - if (nodeGroupName != null) { - acquireNodeGroupReadLock(locks, nodeGroupName); - } - if (!isDefaultCompactionPolicy) { - acquireMergePolicyReadLock(locks, compactionPolicyName); - } - acquireDatasetWriteLock(locks, datasetFullyQualifiedName); - } - - public void createIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName); - } - - public void createTypeBegin(LockList locks, String dataverseName, String itemTypeFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName); - } - - public void dropDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetWriteLock(locks, datasetFullyQualifiedName); - } - - public void dropIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetWriteLock(locks, datasetFullyQualifiedName); - } - - public void dropTypeBegin(LockList locks, String dataverseName, String dataTypeFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName); - } - - public void functionStatementBegin(LockList locks, String dataverseName, String functionFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireFunctionWriteLock(locks, functionFullyQualifiedName); - } - - public void modifyDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetModifyLock(locks, datasetFullyQualifiedName); - } - - public void insertDeleteUpsertBegin(LockList locks, String datasetFullyQualifiedName) throws AsterixException { - acquireDataverseReadLock(locks, DatasetUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName)); - acquireDatasetModifyLock(locks, datasetFullyQualifiedName); - } - - public void dropFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireFeedWriteLock(locks, feedFullyQualifiedName); - } - - public void dropFeedPolicyBegin(LockList locks, String dataverseName, String policyName) throws AsterixException { - acquireFeedWriteLock(locks, policyName); - acquireDataverseReadLock(locks, dataverseName); - } - - public void startFeedBegin(LockList locks, String dataverseName, String feedName, - List<FeedConnection> feedConnections) throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireFeedReadLock(locks, feedName); - for (FeedConnection feedConnection : feedConnections) { - // what if the dataset is in a different dataverse - String fqName = dataverseName + "." + feedConnection.getDatasetName(); - acquireDatasetReadLock(locks, fqName); - } - } - - public void stopFeedBegin(LockList locks, String dataverseName, String feedName) throws AsterixException { - // TODO: dataset lock? - // Dataset locks are not required here since datasets are protected by the active event listener - acquireDataverseReadLock(locks, dataverseName); - acquireFeedReadLock(locks, feedName); - } - - public void createFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireFeedWriteLock(locks, feedFullyQualifiedName); - } - - public void connectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetReadLock(locks, datasetFullyQualifiedName); - acquireFeedReadLock(locks, feedFullyQualifiedName); - } - - public void createFeedPolicyBegin(LockList locks, String dataverseName, String policyName) throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireFeedPolicyWriteLock(locks, policyName); - } - - public void disconnectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName, - String feedFullyQualifiedName) throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetReadLock(locks, datasetFullyQualifiedName); - acquireFeedReadLock(locks, feedFullyQualifiedName); - } - - public void compactBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireDatasetReadLock(locks, datasetFullyQualifiedName); + @Override + public void upgradeDatasetLockToWrite(LockList locks, String fullyQualifiedName) throws AlgebricksException { + String key = DATASET_PREFIX + fullyQualifiedName; + DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); + locks.upgrade(IMetadataLock.Mode.UPGRADED_WRITE, lock); } - public void refreshDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) - throws AsterixException { - acquireDataverseReadLock(locks, dataverseName); - acquireExternalDatasetRefreshLock(locks, datasetFullyQualifiedName); + @Override + public void downgradeDatasetLockToExclusiveModify(LockList locks, String fullyQualifiedName) + throws AlgebricksException { + String key = DATASET_PREFIX + fullyQualifiedName; + DatasetLock lock = (DatasetLock) mdlocks.computeIfAbsent(key, DATASET_LOCK_FUNCTION); + locks.downgrade(IMetadataLock.Mode.EXCLUSIVE_MODIFY, lock); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 154e1b5..e4a6ca8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -34,14 +34,15 @@ import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -51,7 +52,6 @@ import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.om.base.AMutableString; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.types.ARecordType; @@ -283,22 +283,22 @@ public class DatasetUtil { metaItemType = (ARecordType) metadataProvider.findMetaType(dataset); } JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider - .getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset); FileSplit[] fs = splitsAndConstraint.first.getFileSplits(); StringBuilder sb = new StringBuilder(); for (int i = 0; i < fs.length; i++) { sb.append(fs[i] + " "); } LOGGER.info("CREATING File Splits: " + sb.toString()); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); //prepare a LocalResourceMetadata which will be stored in NC's local resource repository IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType, compactionInfo.first, compactionInfo.second); - IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first, - resourceFactory, !dataset.isTemp()); + IndexBuilderFactory indexBuilderFactory = + new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + splitsAndConstraint.first, resourceFactory, !dataset.isTemp()); IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, splitsAndConstraint.second); @@ -344,8 +344,8 @@ public class DatasetUtil { */ public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, JobId jobId) throws AlgebricksException { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider - .getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset); IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first; AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; // -Infinity @@ -396,8 +396,8 @@ public class DatasetUtil { try { Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider - .getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -405,8 +405,8 @@ public class DatasetUtil { for (int i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; } - boolean hasSecondaries = metadataProvider - .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; + boolean hasSecondaries = + metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); @@ -426,8 +426,8 @@ public class DatasetUtil { f++; // add the previous meta second if (dataset.hasMetaPart()) { - outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() - .getSerializerDeserializer(metaItemType); + outputSerDes[f] = + FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType); outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); f++; } @@ -477,8 +477,8 @@ public class DatasetUtil { */ public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset, MetadataProvider metadataProvider) throws AlgebricksException { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider - .getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset); AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; // Build dummy tuple containing one field with a dummy value inside. @@ -506,9 +506,8 @@ public class DatasetUtil { return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a . } - public static String getDataverseFromFullyQualifiedName(String datasetName) { - int idx = datasetName.indexOf('.'); - return datasetName.substring(0, idx); + public static String getFullyQualifiedName(Dataset dataset) { + return dataset.getDataverseName() + '.' + dataset.getDatasetName(); } /*** @@ -548,13 +547,14 @@ public class DatasetUtil { */ public static String createNodeGroupForNewDataset(String dataverseName, String datasetName, long rebalanceCount, Set<String> ncNames, MetadataProvider metadataProvider) throws Exception { + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); String nodeGroup = dataverseName + "." + datasetName + (rebalanceCount == 0L ? "" : "_" + rebalanceCount); MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); + appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup); if (ng != null) { nodeGroup = nodeGroup + "_" + UUID.randomUUID().toString(); - MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); + appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); } MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodeGroup, new ArrayList<>(ncNames))); return nodeGroup; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java new file mode 100644 index 0000000..4cf25f7 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.metadata.LockList; + +public class MetadataLockUtil { + + private MetadataLockUtil() { + } + + public static void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String itemTypeDataverseName, String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, + String metaItemTypeFullyQualifiedName, String nodeGroupName, String compactionPolicyName, + String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + if (!dataverseName.equals(itemTypeDataverseName)) { + lockMgr.acquireDataverseReadLock(locks, itemTypeDataverseName); + } + if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName) + && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) { + lockMgr.acquireDataverseReadLock(locks, metaItemTypeDataverseName); + } + lockMgr.acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName); + if (metaItemTypeFullyQualifiedName != null + && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) { + lockMgr.acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName); + } + if (nodeGroupName != null) { + lockMgr.acquireNodeGroupReadLock(locks, nodeGroupName); + } + if (!isDefaultCompactionPolicy) { + lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName); + } + lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public static void createIndexBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName); + } + + public static void dropIndexBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public static void createTypeBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String itemTypeFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName); + } + + public static void dropDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetWriteLock(locks, datasetFullyQualifiedName); + } + + public static void dropTypeBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String dataTypeFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName); + } + + public static void functionStatementBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String functionFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireFunctionWriteLock(locks, functionFullyQualifiedName); + } + + public static void modifyDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetModifyLock(locks, datasetFullyQualifiedName); + } + + public static void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, + MetadataUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName)); + lockMgr.acquireDatasetModifyLock(locks, datasetFullyQualifiedName); + } + + public static void dropFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String feedFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityWriteLock(locks, feedFullyQualifiedName); + } + + public static void dropFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String policyName) throws AsterixException { + lockMgr.acquireActiveEntityWriteLock(locks, policyName); + lockMgr.acquireDataverseReadLock(locks, dataverseName); + } + + public static void startFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String feedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityReadLock(locks, feedName); + } + + public static void stopFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String feedName) throws AsterixException { + // TODO: dataset lock? + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityReadLock(locks, feedName); + } + + public static void createFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String feedFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityWriteLock(locks, feedFullyQualifiedName); + } + + public static void connectFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName, String feedFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityReadLock(locks, feedFullyQualifiedName); + lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName); + } + + public static void createFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String policyName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireFeedPolicyWriteLock(locks, policyName); + } + + public static void disconnectFeedBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName, String feedFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireActiveEntityReadLock(locks, feedFullyQualifiedName); + lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName); + } + + public static void compactBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetReadLock(locks, datasetFullyQualifiedName); + } + + public static void refreshDatasetBegin(IMetadataLockManager lockMgr, LockList locks, String dataverseName, + String datasetFullyQualifiedName) throws AsterixException { + lockMgr.acquireDataverseReadLock(locks, dataverseName); + lockMgr.acquireDatasetExclusiveModificationLock(locks, datasetFullyQualifiedName); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java index 3133aba..e5d4721 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataUtil.java @@ -38,4 +38,9 @@ public class MetadataUtil { return "Unknown Pending Operation"; } } + + public static String getDataverseFromFullyQualifiedName(String datasetName) { + int idx = datasetName.indexOf('.'); + return datasetName.substring(0, idx); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index e634d4e..6825f10 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -24,11 +24,12 @@ import java.util.List; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -59,12 +60,12 @@ public class SplitsAndConstraintsUtil { public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, MetadataTransactionContext mdTxnCtx) throws AlgebricksException { try { - List<String> nodeGroup = - MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); + NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()); if (nodeGroup == null) { throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName()); } - return getIndexSplits(dataset, indexName, nodeGroup); + List<String> nodeList = nodeGroup.getNodeNames(); + return getIndexSplits(dataset, indexName, nodeList); } catch (MetadataException me) { throw new AlgebricksException(me); } @@ -85,11 +86,10 @@ public class SplitsAndConstraintsUtil { for (int k = 0; k < numPartitions; k++) { // format: 'storage dir name'/partition_#/dataverse/dataset_idx_index - File f = new File( - StoragePathUtil.prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId()) - + (dataset.isTemp() ? (File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) - : "") - + File.separator + relPathFile); + File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName, + nodePartitions[k].getPartitionId()) + + (dataset.isTemp() ? (File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) : "") + + File.separator + relPathFile); splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f.getPath())); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java index 5bedffa..ff65994 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/MetadataEntityValueExtractor.java @@ -21,8 +21,8 @@ package org.apache.asterix.metadata.valueextractors; import java.rmi.RemoteException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.api.IMetadataEntityTupleTranslator; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.api.exceptions.HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java index 32bed0d..5f0525b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/NestedDatatypeNameValueExtractor.java @@ -24,8 +24,8 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java index fcf69d5..1928d7e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/valueextractors/TupleCopyValueExtractor.java @@ -21,8 +21,8 @@ package org.apache.asterix.metadata.valueextractors; import java.nio.ByteBuffer; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.api.IValueExtractor; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java index 6f3bd30..e5d0d7d 100644 --- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java index fe76c6b..cdbaad3 100644 --- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java +++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java @@ -30,7 +30,7 @@ import java.util.Map; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Datatype; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 4cd243c..28c480f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -21,6 +21,8 @@ package org.apache.asterix.runtime.utils; import java.io.IOException; import java.util.function.Supplier; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.BuildProperties; @@ -74,12 +76,13 @@ public class CcApplicationContext implements ICcApplicationContext { private Object extensionManager; private IFaultToleranceStrategy ftStrategy; private IJobLifecycleListener activeLifeCycleListener; + private IMetadataLockManager mdLockManager; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, IResourceIdManager resourceIdManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener, - IStorageComponentProvider storageComponentProvider) + IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager) throws AsterixException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; @@ -105,6 +108,7 @@ public class CcApplicationContext implements ICcApplicationContext { this.metadataBootstrapSupplier = metadataBootstrapSupplier; this.globalRecoveryManager = globalRecoveryManager; this.storageComponentProvider = storageComponentProvider; + this.mdLockManager = mdLockManager; } @Override @@ -210,7 +214,7 @@ public class CcApplicationContext implements ICcApplicationContext { } @Override - public IJobLifecycleListener getActiveLifecycleListener() { + public IJobLifecycleListener getActiveNotificationHandler() { return activeLifeCycleListener; } @@ -218,4 +222,14 @@ public class CcApplicationContext implements ICcApplicationContext { public IStorageComponentProvider getStorageComponentProvider() { return storageComponentProvider; } + + @Override + public IMetadataLockManager getMetadataLockManager() { + return mdLockManager; + } + + @Override + public IClusterStateManager getClusterStateManager() { + return ClusterStateManager.INSTANCE; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index da341af..4717a7b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -66,17 +66,11 @@ public class ClusterStateManager implements IClusterStateManager { public static final ClusterStateManager INSTANCE = new ClusterStateManager(); private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>(); private Set<String> pendingRemoval = new HashSet<>(); - private final Cluster cluster; private ClusterState state = ClusterState.UNUSABLE; - private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint; - - private boolean globalRecoveryCompleted = false; - private Map<String, ClusterPartition[]> node2PartitionsMap; private SortedMap<Integer, ClusterPartition> clusterPartitions; - private String currentMetadataNode = null; private boolean metadataNodeActive = false; private Set<String> failedNodes = new HashSet<>(); @@ -117,7 +111,9 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void setState(ClusterState state) { + LOGGER.info("updating cluster state from " + this.state + " to " + state.name()); this.state = state; + appCtx.getGlobalRecoveryManager().notifyStateChange(state); LOGGER.info("Cluster State is now " + state.name()); // Notify any waiting threads for the cluster state to change. notifyAll(); @@ -262,16 +258,8 @@ public class ClusterStateManager implements IClusterStateManager { clusterActiveLocations.add(p.getActiveNodeId()); } } - clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint( - clusterActiveLocations.toArray(new String[] {})); - } - - public boolean isGlobalRecoveryCompleted() { - return globalRecoveryCompleted; - } - - public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) { - this.globalRecoveryCompleted = globalRecoveryCompleted; + clusterPartitionConstraint = + new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); } public boolean isClusterActive() { @@ -384,13 +372,12 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void deregisterNodePartitions(String nodeId) { - ClusterPartition [] nodePartitions = node2PartitionsMap.remove(nodeId); + ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId); if (nodePartitions == null) { LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)"); } else { if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + - Arrays.toString(nodePartitions)); + LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + Arrays.toString(nodePartitions)); } for (ClusterPartition nodePartition : nodePartitions) { clusterPartitions.remove(nodePartition.getPartitionId()); @@ -413,7 +400,7 @@ public class ClusterStateManager implements IClusterStateManager { LOGGER.info("Deregistering intention to remove node id " + nodeId); } if (!pendingRemoval.remove(nodeId)) { - LOGGER.warning("Cannot deregister intention to remove node id " + nodeId + " that was not registered"); + LOGGER.warning("Cannot deregister intention to remove node id " + nodeId + " that was not registered"); return false; } else { return true; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java index e4a94a9..3b2d0ce 100644 --- a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java +++ b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/datagen/AdmDataGen.java @@ -47,13 +47,13 @@ import org.apache.asterix.common.annotations.TypeDataGen; import org.apache.asterix.common.annotations.UndeclaredFieldsDataGen; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.lang.aql.parser.AQLParserFactory; import org.apache.asterix.lang.aql.parser.ParseException; import org.apache.asterix.lang.common.base.IParser; import org.apache.asterix.lang.common.base.IParserFactory; import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java index a525b1b..94389b2 100644 --- a/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java +++ b/asterixdb/asterix-tools/src/test/java/org/apache/asterix/tools/translator/ADGenDmlTranslator.java @@ -24,10 +24,10 @@ import java.util.Map; import org.apache.asterix.common.annotations.TypeDataGen; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.statement.TypeDecl; -import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataTransactionContext; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java index 443edf1..f5695a5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java @@ -46,7 +46,7 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields); log(pkHash, after, before); } catch (ACIDException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java index 3119ddd..008f0be 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java @@ -24,21 +24,23 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; public interface IDatasetPartitionManager extends IDatasetManager { - public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, + IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult, boolean asyncMode, int partition, int nPartitions) throws HyracksException; - public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, + void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions, boolean orderedResult, boolean emptyResult) throws HyracksException; - public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) - throws HyracksException; + void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; - public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc) + void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc) throws HyracksException; - public void removePartition(JobId jobId, ResultSetId resultSetId, int partition); + void removePartition(JobId jobId, ResultSetId resultSetId, int partition); + + void abortReader(JobId jobId); + + void abortAllReaders(); - public void abortReader(JobId jobId); + void close(); - public void close(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index c9cdb2d..0a99ea6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -53,11 +53,6 @@ public class HyracksDataException extends HyracksException { return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params); } - public static HyracksDataException create(HyracksDataException e, String nodeId) { - return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, - e.getParams()); - } - public static HyracksDataException suppress(HyracksDataException root, Throwable th) { if (root == null) { return HyracksDataException.create(th); @@ -124,7 +119,7 @@ public class HyracksDataException extends HyracksException { } public HyracksDataException(String component, int errorCode, Throwable cause, Serializable... params) { - super(component, errorCode, cause.getMessage(), cause, null, params); + super(component, errorCode, cause, params); } public HyracksDataException(String component, int errorCode, String message, Throwable cause, @@ -132,4 +127,8 @@ public class HyracksDataException extends HyracksException { super(component, errorCode, message, cause, null, params); } + public static HyracksDataException create(HyracksDataException e, String nodeId) { + return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, + e.getParams()); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java index 30ffebe..338c331 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java @@ -18,12 +18,38 @@ */ package org.apache.hyracks.api.job; +import java.util.List; + import org.apache.hyracks.api.exceptions.HyracksException; +/** + * A listener for job related events + */ public interface IJobLifecycleListener { - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; + /** + * Notify the listener that a job has been created + * + * @param jobId + * @param spec + * @throws HyracksException + */ + void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; - public void notifyJobStart(JobId jobId) throws HyracksException; + /** + * Notify the listener that the job has started on the cluster controller + * + * @param jobId + * @throws HyracksException + */ + void notifyJobStart(JobId jobId) throws HyracksException; - public void notifyJobFinish(JobId jobId) throws HyracksException; + /** + * Notify the listener that the job has been terminated, passing exceptions in case of failure + * + * @param jobId + * @param jobStatus + * @param exceptions + * @throws HyracksException + */ + void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 627f406..e1c218f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -167,7 +167,7 @@ public class ClusterControllerService implements IControllerService { threadDumpRunMap = Collections.synchronizedMap(new HashMap<>()); // Node manager is in charge of cluster membership management. - nodeManager = new NodeManager(ccConfig, resourceManager); + nodeManager = new NodeManager(this, ccConfig, resourceManager); jobIdFactory = new JobIdFactory(); } @@ -193,9 +193,9 @@ public class ClusterControllerService implements IControllerService { clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI, new CCNCFunctions.SerializerDeserializer()); IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory); - clientIPC = new IPCSystem( - new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), - ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer()); + clientIPC = + new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), + ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer()); webServer = new WebServer(this, ccConfig.getConsoleListenPort()); clusterIPC.start(); clientIPC.start(); @@ -222,9 +222,9 @@ public class ClusterControllerService implements IControllerService { // Job manager is in charge of job lifecycle management. try { - Constructor<?> jobManagerConstructor = this.getClass().getClassLoader() - .loadClass(ccConfig.getJobManagerClass()) - .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class); + Constructor<?> jobManagerConstructor = + this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor( + CCConfig.class, ClusterControllerService.class, IJobCapacityController.class); jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { @@ -263,13 +263,14 @@ public class ClusterControllerService implements IControllerService { @Override public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException { // no-op, we don't care + LOGGER.log(Level.WARNING, "Getting notified that node: " + nodeId + " has joined. and we don't care"); } @Override public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException { + LOGGER.log(Level.WARNING, "Getting notified that nodes: " + deadNodeIds + " has failed"); for (String nodeId : deadNodeIds) { Pair<String, Integer> ncService = getNCService(nodeId); - final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, ncService.getLeft(), ncService.getRight(), nodeId); workQueue.schedule(triggerWork); @@ -396,8 +397,8 @@ public class ClusterControllerService implements IControllerService { @Override public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException { - GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork( - ClusterControllerService.this.getNodeManager(), map); + GetIpAddressNodeNameMapWork ginmw = + new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), map); try { workQueue.scheduleAndSync(ginmw); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java index 5075081..26245e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ServiceContext; @@ -88,14 +89,14 @@ public class CCServiceContext extends ServiceContext implements ICCServiceContex } } - public synchronized void notifyJobFinish(JobId jobId) throws HyracksException { + public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobFinish(jobId); + l.notifyJobFinish(jobId, jobStatus, exceptions); } } - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) - throws HyracksException { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { l.notifyJobCreation(jobId, spec); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 47e78a3..8cca1e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -38,19 +38,24 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.resource.NodeCapacity; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.job.IJobManager; +import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.scheduler.IResourceManager; import org.apache.hyracks.control.common.controllers.CCConfig; public class NodeManager implements INodeManager { private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName()); + private final ClusterControllerService ccs; private final CCConfig ccConfig; private final IResourceManager resourceManager; private final Map<String, NodeControllerState> nodeRegistry; private final Map<InetAddress, Set<String>> ipAddressNodeNameMap; - public NodeManager(CCConfig ccConfig, IResourceManager resourceManager) { + public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) { + this.ccs = ccs; this.ccConfig = ccConfig; this.resourceManager = resourceManager; this.nodeRegistry = new LinkedHashMap<>(); @@ -79,15 +84,18 @@ public class NodeManager implements INodeManager { @Override public void addNode(String nodeId, NodeControllerState ncState) throws HyracksException { + LOGGER.warning("addNode(" + nodeId + ") called"); if (nodeId == null || ncState == null) { throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); } // Updates the node registry. if (nodeRegistry.containsKey(nodeId)) { - LOGGER.warning("Node with name " + nodeId + " has already registered; re-registering"); + LOGGER.warning( + "Node with name " + nodeId + " has already registered; failing the node then re-registering."); + removeDeadNode(nodeId); } + LOGGER.warning("adding node to registry"); nodeRegistry.put(nodeId, ncState); - // Updates the IP address to node names map. try { InetAddress ipAddress = getIpAddress(ncState); @@ -98,8 +106,8 @@ public class NodeManager implements INodeManager { nodeRegistry.remove(nodeId); throw e; } - // Updates the cluster capacity. + LOGGER.warning("updating cluster capacity"); resourceManager.update(nodeId, ncState.getCapacity()); } @@ -147,6 +155,27 @@ public class NodeManager implements INodeManager { return Pair.of(deadNodes, affectedJobIds); } + public void removeDeadNode(String nodeId) throws HyracksException { + NodeControllerState state = nodeRegistry.get(nodeId); + Set<JobId> affectedJobIds = state.getActiveJobIds(); + // Removes the node from node map. + nodeRegistry.remove(nodeId); + // Removes the node from IP map. + removeNodeFromIpAddressMap(nodeId, state); + // Updates the cluster capacity. + resourceManager.update(nodeId, new NodeCapacity(0L, 0)); + LOGGER.info(nodeId + " considered dead"); + IJobManager jobManager = ccs.getJobManager(); + Set<String> collection = Collections.singleton(nodeId); + for (JobId jobId : affectedJobIds) { + JobRun run = jobManager.get(jobId); + if (run != null) { + run.getExecutor().notifyNodeFailures(collection); + } + } + ccs.getContext().notifyNodeFailure(collection); + } + @Override public void apply(NodeFunction nodeFunction) { nodeRegistry.forEach(nodeFunction::apply); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 8401fcf..2685f60 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -41,6 +41,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.cc.PreDistributedJobStore; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; @@ -69,7 +70,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; this.preDistributedJobStore = preDistributedJobStore; - jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>(); + jobResultLocations = new LinkedHashMap<>(); } @Override @@ -94,7 +95,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { } @Override - public void notifyJobFinish(JobId jobId) throws HyracksException { + public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { // Auto-generated method stub } @@ -179,7 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { @Override public synchronized long getResultTimestamp(JobId jobId) { - if (preDistributedJobStore.jobIsPredistributed(jobId)){ + if (preDistributedJobStore.jobIsPredistributed(jobId)) { return -1; } return getState(jobId).getTimestamp(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 45c7711..c1a7899 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -132,8 +132,8 @@ public class JobManager implements IJobManager { // Removes a pending job. JobRun jobRun = jobQueue.remove(jobId); if (jobRun != null) { - List<Exception> exceptions = Collections - .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); + List<Exception> exceptions = + Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. jobRun.setStatus(JobStatus.FAILURE, exceptions); runMapArchive.put(jobId, jobRun); @@ -179,7 +179,7 @@ public class JobManager implements IJobManager { } catch (Exception e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); if (caughtException == null) { - caughtException = new HyracksException(e); + caughtException = HyracksException.create(e); } else { caughtException.addSuppressed(e); } @@ -208,7 +208,7 @@ public class JobManager implements IJobManager { CCServiceContext serviceCtx = ccs.getContext(); if (serviceCtx != null) { try { - serviceCtx.notifyJobFinish(jobId); + serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions()); } catch (HyracksException e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); caughtException = e; @@ -249,8 +249,6 @@ public class JobManager implements IJobManager { } } - - @Override public Collection<JobRun> getRunningJobs() { return activeRunMap.values(); @@ -320,9 +318,8 @@ public class JobManager implements IJobManager { try { run.getExecutor().startJob(); } catch (Exception e) { - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE, - Collections.singletonList(e))); + ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE, + Collections.singletonList(e))); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index f44fca0..bf0846f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -54,6 +54,7 @@ public class RegisterNodeWork extends SynchronizableWork { CCNCFunctions.NodeRegistrationResult result; Map<IOption, Object> ncConfiguration = new HashMap<>(); try { + LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id); INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress()); NodeControllerState state = new NodeControllerState(nc, reg); INodeManager nodeManager = ccs.getNodeManager(); @@ -71,9 +72,12 @@ public class RegisterNodeWork extends SynchronizableWork { result = new CCNCFunctions.NodeRegistrationResult(params, null); ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1); } catch (Exception e) { + LOGGER.log(Level.WARNING, "Node registration failed", e); result = new CCNCFunctions.NodeRegistrationResult(null, e); } + LOGGER.warning("sending registration response to node"); ncIPCHandle.send(-1, result, null); + LOGGER.warning("notifying node join"); ccs.getContext().notifyNodeJoin(id, ncConfiguration); } }
