Repository: asterixdb Updated Branches: refs/heads/master 98b9d603e -> f9e6bae98
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 027f72c..5110d74 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -19,8 +19,12 @@ package org.apache.asterix.common.utils; import java.io.File; +import java.nio.file.Paths; +import java.util.function.Function; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.storage.IndexPathElements; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -36,6 +40,7 @@ public class StoragePathUtil { private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName()); public static final String PARTITION_DIR_PREFIX = "partition_"; public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_"; + private static Function<IndexPathElements, String> indexPathProvider; private StoragePathUtil() { } @@ -69,8 +74,10 @@ public class StoragePathUtil { } private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) { - return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR - + idxName; + if (indexPathProvider != null) { + return indexPathProvider.apply(new IndexPathElements(datasetName, idxName, String.valueOf(rebalanceCount))); + } + return datasetName + File.separator + rebalanceCount + File.separator + idxName; } public static int getPartitionNumFromName(String name) { @@ -88,10 +95,7 @@ public class StoragePathUtil { * @return the file relative path starting from the partition directory */ public static String getIndexFileRelativePath(String fileAbsolutePath) { - String[] tokens = fileAbsolutePath.split(File.separator); - //partition/dataverse/idx/fileName - return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator - + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1]; + return ResourceReference.of(fileAbsolutePath).getRelativePath().toString(); } /** @@ -136,7 +140,10 @@ public class StoragePathUtil { * @return The index name */ public static String getIndexNameFromPath(String path) { - int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR); - return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path; + return Paths.get(path).getFileName().toString(); + } + + public static void setIndexPathProvider(Function<IndexPathElements, String> indexPathProvider) { + StoragePathUtil.indexPathProvider = indexPathProvider; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml index 3138806..f209aae 100644 --- a/asterixdb/asterix-replication/pom.xml +++ b/asterixdb/asterix-replication/pom.xml @@ -43,11 +43,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.asterix</groupId> - <artifactId>asterix-metadata</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 9d8c351..3143284 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -54,7 +54,7 @@ import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.IReplicationThread; import org.apache.asterix.common.replication.ReplicaEvent; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; @@ -392,7 +392,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { //start sending files for (String filePath : filesList) { // Send only files of datasets that are replciated. - IndexFileProperties indexFileRef = localResourceRep.getIndexFileRef(filePath); + DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath); if (!repStrategy.isMatch(indexFileRef.getDatasetId())) { continue; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index b0aa0fb..48c7083 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -62,7 +62,7 @@ import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.replication.Replica.ReplicaState; import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.common.replication.ReplicationJob; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogRecord; @@ -280,7 +280,7 @@ public class ReplicationManager implements IReplicationManager { //all of the job's files belong to a single storage partition. //get any of them to determine the partition from the file path. String jobFile = job.getJobFiles().iterator().next(); - IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile); + DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile); if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { return; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java index a8b15d2..7ca6f2f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -23,9 +23,11 @@ import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Paths; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.replication.logging.TxnLogUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; @@ -93,18 +95,16 @@ public class LSMComponentProperties { return lsmCompProp; } - public String getMaskPath(ReplicaResourcesManager resourceManager) { + public String getMaskPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { if (maskPath == null) { LSMIndexFileProperties afp = new LSMIndexFileProperties(this); - //split the index file path to get the LSM component file name - afp.splitFileName(); maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; } return maskPath; } - public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) { + public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { if (replicaPath == null) { LSMIndexFileProperties afp = new LSMIndexFileProperties(this); replicaPath = resourceManager.getIndexPath(afp); @@ -118,23 +118,10 @@ public class LSMComponentProperties { * @return a unique id based on the timestamp of the component */ public static String getLSMComponentID(String filePath) { - String[] tokens = filePath.split(File.separator); - - int arraySize = tokens.length; - String fileName = tokens[arraySize - 1]; - String idxName = tokens[arraySize - 2]; - String dataverse = tokens[arraySize - 3]; - String partitionName = tokens[arraySize - 4]; - - StringBuilder componentId = new StringBuilder(); - componentId.append(partitionName); - componentId.append(File.separator); - componentId.append(dataverse); - componentId.append(File.separator); - componentId.append(idxName); - componentId.append(File.separator); - componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.DELIMITER))); - return componentId.toString(); + final ResourceReference ref = ResourceReference.of(filePath); + final String fileUniqueTimestamp = + ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); + return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString(); } public String getComponentId() { @@ -149,18 +136,10 @@ public class LSMComponentProperties { return nodeId; } - public int getNumberOfFiles() { - return numberOfFiles.get(); - } - public int markFileComplete() { return numberOfFiles.decrementAndGet(); } - public void setNumberOfFiles(AtomicInteger numberOfFiles) { - this.numberOfFiles = numberOfFiles; - } - public Long getReplicaLSN() { return replicaLSN; } @@ -173,10 +152,6 @@ public class LSMComponentProperties { return opType; } - public void setOpType(LSMOperationType opType) { - this.opType = opType; - } - public String getNodeUniqueLSN() { return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java index eb9e82d..f2747fe 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java @@ -20,24 +20,18 @@ package org.apache.asterix.replication.storage; import java.io.DataInput; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.OutputStream; - -import org.apache.asterix.common.utils.StoragePathUtil; +import java.nio.file.Paths; public class LSMIndexFileProperties { - private String fileName; private long fileSize; private String nodeId; - private String dataverse; - private String idxName; private boolean lsmComponentFile; private String filePath; private boolean requiresAck = false; private long LSNByteOffset; - private int partition; public LSMIndexFileProperties() { } @@ -61,15 +55,6 @@ public class LSMIndexFileProperties { this.requiresAck = requiresAck; } - public void splitFileName() { - String[] tokens = filePath.split(File.separator); - int arraySize = tokens.length; - this.fileName = tokens[arraySize - 1]; - this.idxName = tokens[arraySize - 2]; - this.dataverse = tokens[arraySize - 3]; - this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]); - } - public void serialize(OutputStream out) throws IOException { DataOutputStream dos = new DataOutputStream(out); dos.writeUTF(nodeId); @@ -100,26 +85,10 @@ public class LSMIndexFileProperties { return fileSize; } - public String getFileName() { - return fileName; - } - public String getNodeId() { return nodeId; } - public String getDataverse() { - return dataverse; - } - - public void setDataverse(String dataverse) { - this.dataverse = dataverse; - } - - public String getIdxName() { - return idxName; - } - public boolean isLSMComponentFile() { return lsmComponentFile; } @@ -128,16 +97,17 @@ public class LSMIndexFileProperties { return requiresAck; } + public String getFileName() { + return Paths.get(filePath).toFile().getName(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("File Name: " + fileName + " "); + sb.append("File Path: " + filePath + " "); sb.append("File Size: " + fileSize + " "); sb.append("Node ID: " + nodeId + " "); - sb.append("Partition: " + partition + " "); - sb.append("IDX Name: " + idxName + " "); sb.append("isLSMComponentFile : " + lsmComponentFile + " "); - sb.append("Dataverse: " + dataverse); sb.append("LSN Byte Offset: " + LSNByteOffset); return sb.toString(); } @@ -145,8 +115,4 @@ public class LSMIndexFileProperties { public long getLSNByteOffset() { return LSNByteOffset; } - - public int getPartition() { - return partition; - } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index cf8e001..7eea4a4 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -38,14 +38,14 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; @@ -63,7 +63,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { nodePartitions = metadataProperties.getNodePartitions(); } - public void deleteIndexFile(LSMIndexFileProperties afp) { + public void deleteIndexFile(LSMIndexFileProperties afp) throws HyracksDataException { String indexPath = getIndexPath(afp); if (indexPath != null) { if (afp.isLSMComponentFile()) { @@ -78,20 +78,12 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } } - public String getIndexPath(LSMIndexFileProperties fileProperties) { - fileProperties.splitFileName(); - //get partition path in this node - String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition()); - //get index path - String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(), - fileProperties.getDataverse(), fileProperties.getIdxName()); - - Path path = Paths.get(indexPath); - if (!Files.exists(path)) { - File indexFolder = new File(indexPath); - indexFolder.mkdirs(); + public String getIndexPath(LSMIndexFileProperties fileProperties) throws HyracksDataException { + final FileReference indexPath = localRepository.getIndexPath(Paths.get(fileProperties.getFilePath())); + if (!indexPath.getFile().exists()) { + indexPath.getFile().mkdirs(); } - return indexPath; + return indexPath.toString(); } public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException { @@ -123,21 +115,21 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap); } - public Set<File> getReplicaIndexes(String replicaId) { + public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException { Set<File> remoteIndexesPaths = new HashSet<File>(); ClusterPartition[] partitions = nodePartitions.get(replicaId); for (ClusterPartition partition : partitions) { - remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId())); + remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId())); } return remoteIndexesPaths; } @Override - public long getPartitionsMinLSN(Set<Integer> partitions) { + public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException { long minRemoteLSN = Long.MAX_VALUE; for (Integer partition : partitions) { //for every index in replica - Set<File> remoteIndexes = getPartitionIndexes(partition); + Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition); for (File indexFolder : remoteIndexes) { //read LSN map try { @@ -164,7 +156,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { for (File indexFolder : remoteIndexes) { if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) { File localResource = new File( - indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME); + indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME); LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource); laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath()); } @@ -190,7 +182,12 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { public void cleanInvalidLSMComponents(String replicaId) { //for every index in replica - Set<File> remoteIndexes = getReplicaIndexes(replicaId); + Set<File> remoteIndexes = null; + try { + remoteIndexes = getReplicaIndexes(replicaId); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } for (File remoteIndexFile : remoteIndexes) { //search for any mask File[] masks = remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER); @@ -241,41 +238,11 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { /** * @param partition - * @return Set of file references to each index in the partition - */ - public Set<File> getPartitionIndexes(int partition) { - Set<File> partitionIndexes = new HashSet<File>(); - String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); - String partitionStoragePath = localRepository.getPartitionPath(partition) - + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition); - File partitionRoot = new File(partitionStoragePath); - if (partitionRoot.exists() && partitionRoot.isDirectory()) { - File[] dataverseFileList = partitionRoot.listFiles(); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - if (indexFile.isDirectory()) { - partitionIndexes.add(indexFile); - } - } - } - } - } - } - } - return partitionIndexes; - } - - /** - * @param partition * @return Absolute paths to all partition files */ - public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) { + public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException { List<String> partitionFiles = new ArrayList<String>(); - Set<File> partitionIndexes = getPartitionIndexes(partition); + Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition); for (File indexDir : partitionIndexes) { if (indexDir.isDirectory()) { File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); @@ -284,8 +251,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { if (!relativePath) { partitionFiles.add(file.getAbsolutePath()); } else { - partitionFiles.add( - StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); + partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); } } } @@ -311,7 +277,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() { @Override public boolean accept(File dir, String name) { - return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith("."); + return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) || !name.startsWith("."); } }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index db3647e..04cbef9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -23,22 +23,26 @@ import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.MetadataProperties; @@ -47,7 +51,8 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.ReplicationJob; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; @@ -68,15 +73,14 @@ import com.google.common.cache.CacheBuilder; public class PersistentLocalResourceRepository implements ILocalResourceRepository { - // Public constants - public static final String METADATA_FILE_NAME = ".metadata"; + public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME); // Private constants private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName()); private static final String STORAGE_METADATA_DIRECTORY = StorageConstants.METADATA_ROOT; private static final String STORAGE_METADATA_FILE_NAME_PREFIX = "." + StorageConstants.METADATA_ROOT; private static final int MAX_CACHED_RESOURCES = 1000; - private static final FilenameFilter METADATA_FILES_FILTER = - (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME); + public static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6; + // Finals private final IIOManager ioManager; private final String[] mountPoints; @@ -157,8 +161,9 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito //make dirs for the storage metadata file boolean success = storageMetadataDir.mkdirs(); if (!success) { - throw HyracksDataException.create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, - getClass().getSimpleName(), storageMetadataDir.getAbsolutePath()); + throw HyracksDataException + .create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, getClass().getSimpleName(), + storageMetadataDir.getAbsolutePath()); } LOGGER.log(Level.INFO, "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath()); @@ -198,8 +203,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath()); } - try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile()); - ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { + try (FileOutputStream fos = new FileOutputStream( + resourceFile.getFile()); ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { oosToFos.writeObject(resource); oosToFos.flush(); } catch (IOException e) { @@ -226,27 +231,23 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } finally { // Regardless of successfully deleted or not, the operation should be replicated. //if replication enabled, delete resource from remote replicas - if (isReplicationEnabled - && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { + if (isReplicationEnabled && !resourceFile.getFile().getName() + .startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { createReplicationJob(ReplicationOperation.DELETE, resourceFile); } } } else { - throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, - relativePath); + throw HyracksDataException + .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath); } } private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath) throws HyracksDataException { - String fileName = resourcePath + File.separator + METADATA_FILE_NAME; + String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME; return ioManager.resolve(fileName); } - - public Map<Long, LocalResource> loadAndGetAllResources() throws IOException { - //TODO During recovery, the memory usage currently is proportional to the number of resources available. - //This could be fixed by traversing all resources on disk until the required resource is found. - LOGGER.log(Level.INFO, "Loading all resources"); + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException { Map<Long, LocalResource> resourcesMap = new HashMap<>(); for (int i = 0; i < mountPoints.length; i++) { File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i); @@ -255,108 +256,43 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito continue; } LOGGER.log(Level.INFO, "Getting storage root dir returned " + storageRootDir.getAbsolutePath()); - //load all local resources. - File[] partitions = storageRootDir.listFiles(); - LOGGER.log(Level.INFO, "Number of partitions found = " + partitions.length); - for (File partition : partitions) { - File[] dataverseFileList = partition.listFiles(); - LOGGER.log(Level.INFO, "Reading partition = " + partition.getName() + ". Number of dataverses found: " - + dataverseFileList.length); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - loadDataverse(dataverseFile, resourcesMap); + try (Stream<Path> stream = Files.find(storageRootDir.toPath(), RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT, + (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) { + final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList()); + for (File file : resourceMetadataFiles) { + final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file); + if (filter.test(localResource)) { + resourcesMap.put(localResource.getId(), localResource); } } + } catch (IOException e) { + throw HyracksDataException.create(e); } } return resourcesMap; - } - private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException { - LOGGER.log(Level.INFO, "Loading dataverse:" + dataverseFile.getName()); - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - loadIndex(indexFile, resourcesMap); - } - } - } } - private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException { - LOGGER.log(Level.INFO, "Loading index:" + indexFile.getName()); - if (indexFile.isDirectory()) { - File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER); - if (metadataFiles != null) { - for (File metadataFile : metadataFiles) { - LocalResource localResource = readLocalResource(metadataFile); - LOGGER.log(Level.INFO, "Resource loaded " + localResource.getId() + ":" + localResource.getPath()); - resourcesMap.put(localResource.getId(), localResource); - } - } - } + public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException { + return getResources(p -> true); } @Override public long maxId() throws HyracksDataException { - long maxResourceId = 0; - for (int i = 0; i < mountPoints.length; i++) { - File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i); - if (storageRootDir == null) { - continue; - } - - //load all local resources. - File[] partitions = storageRootDir.listFiles(); - for (File partition : partitions) { - //traverse all local resources. - File[] dataverseFileList = partition.listFiles(); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId); - } - } - } - } - return maxResourceId; - } - - private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException { - long maxResourceId = maxSoFar; - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId); - } - } - } - return maxResourceId; - } - - private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException { - long maxResourceId = maxSoFar; - if (indexFile.isDirectory()) { - File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER); - if (metadataFiles != null) { - for (File metadataFile : metadataFiles) { - LocalResource localResource = readLocalResource(metadataFile); - maxResourceId = Math.max(maxResourceId, localResource.getId()); - } - } - } - return maxResourceId; + final Map<Long, LocalResource> allResources = loadAndGetAllResources(); + final Optional<Long> max = allResources.keySet().stream().max(Long::compare); + return max.isPresent() ? max.get() : 0; } private static String getFileName(String path) { - return path.endsWith(File.separator) ? (path + METADATA_FILE_NAME) - : (path + File.separator + METADATA_FILE_NAME); + return path.endsWith(File.separator) ? + (path + StorageConstants.METADATA_FILE_NAME) : + (path + File.separator + StorageConstants.METADATA_FILE_NAME); } public static LocalResource readLocalResource(File file) throws HyracksDataException { - try (FileInputStream fis = new FileInputStream(file); - ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { + try (FileInputStream fis = new FileInputStream(file); ObjectInputStream oisFromFis = new ObjectInputStream( + fis)) { LocalResource resource = (LocalResource) oisFromFis.readObject(); if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) { return resource; @@ -425,8 +361,9 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito * @return A file reference to the storage metadata file. */ private static FileReference getStorageMetadataFile(IIOManager ioManager, String nodeId, int ioDeviceId) { - String storageMetadataFileName = STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" - + ioDeviceId + File.separator + STORAGE_METADATA_FILE_NAME_PREFIX; + String storageMetadataFileName = + STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId + File.separator + + STORAGE_METADATA_FILE_NAME_PREFIX; return new FileReference(ioManager.getIODevices().get(ioDeviceId), storageMetadataFileName); } @@ -483,10 +420,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return Collections.unmodifiableSet(nodeInactivePartitions); } - public Set<Integer> getNodeOrignalPartitions() { - return Collections.unmodifiableSet(nodeOriginalPartitions); - } - public synchronized void addActivePartition(int partitonId) { nodeActivePartitions.add(partitonId); nodeInactivePartitions.remove(partitonId); @@ -497,27 +430,43 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito nodeActivePartitions.remove(partitonId); } - private static String getLocalResourceRelativePath(String absolutePath) { - final String[] tokens = absolutePath.split(File.separator); - // Format: storage_dir/partition/dataverse/idx - return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator - + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2]; + public DatasetResourceReference getLocalResourceReference(String absoluteFilePath) throws HyracksDataException { + //TODO pass relative path + final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); + final LocalResource lr = get(localResourcePath); + return DatasetResourceReference.of(lr); } - public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException { - //TODO pass relative path - final String[] tokens = absoluteFilePath.split(File.separator); - if (tokens.length < 5) { - throw new HyracksDataException("Invalid file format"); + /** + * Gets a set of files for the indexes in partition {@code partition}. Each file points + * the to where the index's files are stored. + * + * @param partition + * @return The set of indexes files + * @throws HyracksDataException + */ + public Set<File> getPartitionIndexes(int partition) throws HyracksDataException { + final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }); + Set<File> indexes = new HashSet<>(); + for (LocalResource localResource : partitionResourcesMap.values()) { + indexes.add(ioManager.resolve(localResource.getPath()).getFile()); } - String fileName = tokens[tokens.length - 1]; - String index = tokens[tokens.length - 2]; - String dataverse = tokens[tokens.length - 3]; - String partition = tokens[tokens.length - 4]; - int partitionId = StoragePathUtil.getPartitionNumFromName(partition); - String relativePath = getLocalResourceRelativePath(absoluteFilePath); - final LocalResource lr = get(relativePath); - int datasetId = lr == null ? -1 : ((DatasetLocalResource) lr.getResource()).getDatasetId(); - return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId); + return indexes; + } + + /** + * Given any index file, an absolute {@link FileReference} is returned which points to where the index of + * {@code indexFile} is located. + * + * @param indexFile + * @return + * @throws HyracksDataException + */ + public FileReference getIndexPath(Path indexFile) throws HyracksDataException { + final ResourceReference ref = ResourceReference.of(indexFile.toString()); + return ioManager.resolve(ref.getRelativePath().toString()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java index 6ce543b..9f5b83c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java @@ -119,7 +119,7 @@ public class ReplicationCheckpointManager extends AbstractCheckpointManager { return minFirstLSN; } - private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) { + private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws HyracksDataException { final IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); final IApplicationContext propertiesProvider = http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java index b9ad1b1..4cf145b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java @@ -35,7 +35,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; public class BTreeResource implements IResource { private static final long serialVersionUID = 1L; - private final String path; + private String path; private final IStorageManager storageManager; private final ITypeTraits[] typeTraits; private final IBinaryComparatorFactory[] comparatorFactories; @@ -63,4 +63,9 @@ public class BTreeResource implements IResource { public String getPath() { return path; } + + @Override + public void setPath(String path) { + this.path = path; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java index 6255c1d..b541750 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java @@ -18,13 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.common.dataflow; -import java.util.List; import java.util.Map; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; @@ -43,7 +40,7 @@ import org.apache.hyracks.storage.common.LocalResource; public abstract class LsmResource implements IResource { private static final long serialVersionUID = 1L; - protected final String path; + protected String path; protected final IStorageManager storageManager; protected final ITypeTraits[] typeTraits; protected final IBinaryComparatorFactory[] cmpFactories; @@ -88,14 +85,8 @@ public abstract class LsmResource implements IResource { return path; } - public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) { - List<IODeviceHandle> ioDevices = ioManager.getIODevices(); - for (int i = 0; i < ioDevices.size(); i++) { - IODeviceHandle device = ioDevices.get(i); - if (device == deviceHandle) { - return i; - } - } - return -1; + @Override + public void setPath(String path) { + this.path = path; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java index df4fbf2..f9eb844 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java @@ -35,7 +35,7 @@ import org.apache.hyracks.storage.common.IStorageManager; public class RTreeResource implements IResource { private static final long serialVersionUID = 1L; - private final String path; + private String path; private final IStorageManager storageManager; private final ITypeTraits[] typeTraits; private final IBinaryComparatorFactory[] comparatorFactories; @@ -68,4 +68,8 @@ public class RTreeResource implements IResource { return path; } + @Override + public void setPath(String path) { + this.path = path; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f9e6bae9/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java index bb27023..7b9166d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java @@ -28,4 +28,11 @@ public interface IResource extends Serializable { IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException; String getPath(); + + /** + * Sets the path of {@link IResource}. + * + * @param path + */ + void setPath(String path); }
