Repository: asterixdb Updated Branches: refs/heads/master fff200ca8 -> ef173f34f
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index f7023b3..b21eb29 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -45,6 +45,8 @@ import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.ReplicationJob; +import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.transactions.Resource; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; @@ -483,20 +485,27 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito nodeActivePartitions.remove(partitonId); } - /** - * @param resourceAbsolutePath - * @return the resource relative path starting from the partition directory - */ - public static String getResourceRelativePath(String resourceAbsolutePath) { - String[] tokens = resourceAbsolutePath.split(File.separator); - //partition/dataverse/idx/fileName - return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator - + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1]; + private static String getLocalResourceRelativePath(String absolutePath) { + final String[] tokens = absolutePath.split(File.separator); + // Format: storage_dir/partition/dataverse/idx + return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator + + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2]; } - public static int getResourcePartition(String resourceAbsolutePath) { - String[] tokens = resourceAbsolutePath.split(File.separator); - //partition/dataverse/idx/fileName - return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]); + public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException { + //TODO pass relative path + final String[] tokens = absoluteFilePath.split(File.separator); + if (tokens.length < 5) { + throw new HyracksDataException("Invalid file format"); + } + String fileName = tokens[tokens.length - 1]; + String index = tokens[tokens.length - 2]; + String dataverse = tokens[tokens.length - 3]; + String partition = tokens[tokens.length - 4]; + int partitionId = StoragePathUtil.getPartitionNumFromName(partition); + String relativePath = getLocalResourceRelativePath(absoluteFilePath); + final LocalResource lr = get(relativePath); + int datasetId = lr == null ? -1 : ((Resource) lr.getResource()).datasetId(); + return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index a248f77..d885f00 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -18,8 +18,12 @@ */ package org.apache.asterix.transaction.management.service.logging; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; @@ -30,15 +34,37 @@ import org.apache.asterix.common.transactions.LogType; public class LogManagerWithReplication extends LogManager { private IReplicationManager replicationManager; + private final IReplicationStrategy replicationStrategy; + private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet(); - public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) { + public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) { super(txnSubsystem); + this.replicationStrategy = replicationStrategy; } @Override public void log(ILogRecord logRecord) throws ACIDException { - //only locally generated logs should be replicated - logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT); + boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT; + if (shouldReplicate) { + switch (logRecord.getLogType()) { + case LogType.ENTITY_COMMIT: + case LogType.UPSERT_ENTITY_COMMIT: + case LogType.UPDATE: + case LogType.FLUSH: + shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId()); + if (shouldReplicate && !replicatedJob.contains(logRecord.getJobId())) { + replicatedJob.add(logRecord.getJobId()); + } + break; + case LogType.JOB_COMMIT: + case LogType.ABORT: + shouldReplicate = replicatedJob.remove(logRecord.getJobId()); + break; + default: + shouldReplicate = false; + } + } + logRecord.setReplicated(shouldReplicate); //Remote flush logs do not need to be flushed separately since they may not trigger local flush if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) { @@ -74,7 +100,8 @@ public class LogManagerWithReplication extends LogManager { } //wait for job Commit/Abort ACK from replicas - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { + if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT + || logRecord.getLogType() == LogType.ABORT)) { while (!replicationManager.hasBeenReplicated(logRecord)) { try { logRecord.wait(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java index 40df685..d606f79 100644 --- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java +++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java @@ -26,6 +26,7 @@ import org.apache.asterix.aoya.AsterixYARNClient; import org.apache.asterix.aoya.Utils; import org.apache.asterix.event.schema.yarnCluster.Cluster; import org.apache.asterix.event.schema.yarnCluster.Node; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -37,6 +38,7 @@ import org.junit.Assert; public class AsterixYARNInstanceUtil { private static final String PATH_ACTUAL = "ittest/"; private static final String INSTANCE_NAME = "asterix-integration-test"; + private static final String TXN_LOG_PATH = "/tmp/asterix-yarn"; private MiniYARNCluster miniCluster; private YarnConfiguration appConf; public String aoyaHome; @@ -120,4 +122,11 @@ public class AsterixYARNInstanceUtil { outdir.delete(); } } + + public static void cleanUp() { + File txnLogFile = new File(TXN_LOG_PATH); + if (txnLogFile.exists()) { + FileUtils.deleteQuietly(txnLogFile); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java index 21c1e18..157cb12 100644 --- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java +++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java @@ -48,6 +48,7 @@ public class AsterixYARNLibraryTestIT { @BeforeClass public static void setUp() throws Exception { + AsterixYARNInstanceUtil.cleanUp(); instance = new AsterixYARNInstanceUtil(); appConf = instance.setUp(); configPath = instance.configPath; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java index 4e7ea8e..60afe91 100644 --- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java +++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java @@ -47,6 +47,7 @@ public class AsterixYARNLifecycleIT { @BeforeClass public static void setUp() throws Exception { + AsterixYARNInstanceUtil.cleanUp(); instance = new AsterixYARNInstanceUtil(); appConf = instance.setUp(); configPath = instance.configPath;
