Repository: asterixdb
Updated Branches:
  refs/heads/master fff200ca8 -> ef173f34f


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f7023b3..b21eb29 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -45,6 +45,8 @@ import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
+import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.transactions.Resource;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
@@ -483,20 +485,27 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         nodeActivePartitions.remove(partitonId);
     }
 
-    /**
-     * @param resourceAbsolutePath
-     * @return the resource relative path starting from the partition directory
-     */
-    public static String getResourceRelativePath(String resourceAbsolutePath) {
-        String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partition/dataverse/idx/fileName
-        return tokens[tokens.length - 4] + File.separator + 
tokens[tokens.length - 3] + File.separator
-                + tokens[tokens.length - 2] + File.separator + 
tokens[tokens.length - 1];
+    private static String getLocalResourceRelativePath(String absolutePath) {
+        final String[] tokens = absolutePath.split(File.separator);
+        // Format: storage_dir/partition/dataverse/idx
+        return tokens[tokens.length - 5] + File.separator + 
tokens[tokens.length - 4] + File.separator
+                + tokens[tokens.length - 3] + File.separator + 
tokens[tokens.length - 2];
     }
 
-    public static int getResourcePartition(String resourceAbsolutePath) {
-        String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partition/dataverse/idx/fileName
-        return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 
4]);
+    public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws 
HyracksDataException {
+        //TODO pass relative path
+        final String[] tokens = absoluteFilePath.split(File.separator);
+        if (tokens.length < 5) {
+            throw new HyracksDataException("Invalid file format");
+        }
+        String fileName = tokens[tokens.length - 1];
+        String index = tokens[tokens.length - 2];
+        String dataverse = tokens[tokens.length - 3];
+        String partition = tokens[tokens.length - 4];
+        int partitionId = StoragePathUtil.getPartitionNumFromName(partition);
+        String relativePath = getLocalResourceRelativePath(absoluteFilePath);
+        final LocalResource lr = get(relativePath);
+        int datasetId = lr == null ? -1 : ((Resource) 
lr.getResource()).datasetId();
+        return new IndexFileProperties(partitionId, dataverse, index, 
fileName, datasetId);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index a248f77..d885f00 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,8 +18,12 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -30,15 +34,37 @@ import org.apache.asterix.common.transactions.LogType;
 public class LogManagerWithReplication extends LogManager {
 
     private IReplicationManager replicationManager;
+    private final IReplicationStrategy replicationStrategy;
+    private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
 
-    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
+    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, 
IReplicationStrategy replicationStrategy) {
         super(txnSubsystem);
+        this.replicationStrategy = replicationStrategy;
     }
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        //only locally generated logs should be replicated
-        logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.WAIT);
+        boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL 
&& logRecord.getLogType() != LogType.WAIT;
+        if (shouldReplicate) {
+            switch (logRecord.getLogType()) {
+                case LogType.ENTITY_COMMIT:
+                case LogType.UPSERT_ENTITY_COMMIT:
+                case LogType.UPDATE:
+                case LogType.FLUSH:
+                    shouldReplicate = 
replicationStrategy.isMatch(logRecord.getDatasetId());
+                    if (shouldReplicate && 
!replicatedJob.contains(logRecord.getJobId())) {
+                        replicatedJob.add(logRecord.getJobId());
+                    }
+                    break;
+                case LogType.JOB_COMMIT:
+                case LogType.ABORT:
+                    shouldReplicate = 
replicatedJob.remove(logRecord.getJobId());
+                    break;
+                default:
+                    shouldReplicate = false;
+            }
+        }
+        logRecord.setReplicated(shouldReplicate);
 
         //Remote flush logs do not need to be flushed separately since they 
may not trigger local flush
         if (logRecord.getLogType() == LogType.FLUSH && 
logRecord.getLogSource() == LogSource.LOCAL) {
@@ -74,7 +100,8 @@ public class LogManagerWithReplication extends LogManager {
                     }
 
                     //wait for job Commit/Abort ACK from replicas
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT) {
+                    if (logRecord.isReplicated() && (logRecord.getLogType() == 
LogType.JOB_COMMIT
+                            || logRecord.getLogType() == LogType.ABORT)) {
                         while 
(!replicationManager.hasBeenReplicated(logRecord)) {
                             try {
                                 logRecord.wait();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
index 40df685..d606f79 100644
--- 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
+++ 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
@@ -26,6 +26,7 @@ import org.apache.asterix.aoya.AsterixYARNClient;
 import org.apache.asterix.aoya.Utils;
 import org.apache.asterix.event.schema.yarnCluster.Cluster;
 import org.apache.asterix.event.schema.yarnCluster.Node;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +38,7 @@ import org.junit.Assert;
 public class AsterixYARNInstanceUtil {
     private static final String PATH_ACTUAL = "ittest/";
     private static final String INSTANCE_NAME = "asterix-integration-test";
+    private static final String TXN_LOG_PATH = "/tmp/asterix-yarn";
     private MiniYARNCluster miniCluster;
     private YarnConfiguration appConf;
     public String aoyaHome;
@@ -120,4 +122,11 @@ public class AsterixYARNInstanceUtil {
             outdir.delete();
         }
     }
+
+    public static void cleanUp() {
+        File txnLogFile = new File(TXN_LOG_PATH);
+        if (txnLogFile.exists()) {
+            FileUtils.deleteQuietly(txnLogFile);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
index 21c1e18..157cb12 100644
--- 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
+++ 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
@@ -48,6 +48,7 @@ public class AsterixYARNLibraryTestIT {
 
     @BeforeClass
     public static void setUp() throws Exception {
+        AsterixYARNInstanceUtil.cleanUp();
         instance = new AsterixYARNInstanceUtil();
         appConf = instance.setUp();
         configPath = instance.configPath;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef173f34/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
index 4e7ea8e..60afe91 100644
--- 
a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
+++ 
b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
@@ -47,6 +47,7 @@ public class AsterixYARNLifecycleIT {
 
     @BeforeClass
     public static void setUp() throws Exception {
+        AsterixYARNInstanceUtil.cleanUp();
         instance = new AsterixYARNInstanceUtil();
         appConf = instance.setUp();
         configPath = instance.configPath;

Reply via email to