Repository: asterixdb
Updated Branches:
  refs/heads/master 3180d8702 -> 929344e93


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 6ebf52c..96a31c6 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -111,8 +111,8 @@ public class LogBuffer implements ILogBuffer {
                     logRecord.isFlushed(false);
                     flushQ.add(logRecord);
                 }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE
-                    && (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT)) {
+            } else if (logRecord.getLogSource() == LogSource.REMOTE && 
(logRecord.getLogType() == LogType.JOB_COMMIT
+                    || logRecord.getLogType() == LogType.ABORT || 
logRecord.getLogType() == LogType.FLUSH)) {
                 remoteJobsQ.add(logRecord);
             }
             this.notify();
@@ -260,10 +260,9 @@ public class LogBuffer implements ILogBuffer {
                     } else if (logRecord.getLogType() == LogType.WAIT) {
                         notifyWaitTermination();
                     }
-                } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT) {
-                        notifyReplicationTermination();
-                    }
+                } else if (logRecord.getLogSource() == LogSource.REMOTE && 
(logRecord.getLogType() == LogType.JOB_COMMIT
+                        || logRecord.getLogType() == LogType.ABORT || 
logRecord.getLogType() == LogType.FLUSH)) {
+                    notifyReplicationTermination();
                 }
                 logRecord = logBufferTailReader.next();
             }
@@ -295,10 +294,12 @@ public class LogBuffer implements ILogBuffer {
 
     public void notifyFlushTermination() throws ACIDException {
         LogRecord logRecord = null;
-        try {
-            logRecord = (LogRecord) flushQ.take();
-        } catch (InterruptedException e) {
-            //ignore
+        while (logRecord == null) {
+            try {
+                logRecord = (LogRecord) flushQ.take();
+            } catch (InterruptedException e) { //NOSONAR LogFlusher should 
survive interrupts
+                //ignore
+            }
         }
         synchronized (logRecord) {
             logRecord.isFlushed(true);
@@ -316,10 +317,12 @@ public class LogBuffer implements ILogBuffer {
 
     public void notifyReplicationTermination() {
         LogRecord logRecord = null;
-        try {
-            logRecord = (LogRecord) remoteJobsQ.take();
-        } catch (InterruptedException e) {
-            //ignore
+        while (logRecord == null) {
+            try {
+                logRecord = (LogRecord) remoteJobsQ.take();
+            } catch (InterruptedException e) { //NOSONAR LogFlusher should 
survive interrupts
+                //ignore
+            }
         }
         logRecord.isFlushed(true);
         IReplicationThread replicationThread = 
logRecord.getReplicationThread();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index d3c056d..a6ceba8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -411,4 +411,8 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
         prevTimestamp = ts;
         return ts;
     }
+
+    public static String getComponentEndTime(String fileName) {
+        return fileName.split(DELIMITER)[1];
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 393aa6a..5368591 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -560,22 +560,31 @@ public class LSMHarness implements ILSMHarness {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a flush operation for index: " + lsmIndex + " 
...");
         }
-
-        ILSMDiskComponent newComponent = null;
-        boolean failedOperation = false;
         try {
-            newComponent = lsmIndex.flush(operation);
-            operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, 
null, newComponent);
-            newComponent.markAsValid(lsmIndex.isDurable());
-        } catch (Throwable e) { // NOSONAR Log and re-throw
-            failedOperation = true;
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+            ILSMDiskComponent newComponent = null;
+            boolean failedOperation = false;
+            try {
+                newComponent = lsmIndex.flush(operation);
+                
operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, 
newComponent);
+                newComponent.markAsValid(lsmIndex.isDurable());
+            } catch (Throwable e) { // NOSONAR Log and re-throw
+                failedOperation = true;
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+                }
+                throw e;
+            } finally {
+                exitComponents(ctx, LSMOperationType.FLUSH, newComponent, 
failedOperation);
+                
operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+
             }
-            throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, 
failedOperation);
-            operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, 
newComponent);
+            /*
+             * Completion of flush/merge operations is done explicitly here to 
make sure all generated files during
+             * io operations is completed before the io operation is declared 
complete
+             */
+            opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH, 
ctx.getSearchOperationCallback(),
+                    ctx.getModificationCallback());
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished the flush operation for index: " + lsmIndex);
@@ -612,37 +621,42 @@ public class LSMHarness implements ILSMHarness {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a merge operation for index: " + lsmIndex + " 
...");
         }
-
-        ILSMDiskComponent newComponent = null;
-        boolean failedOperation = false;
         try {
-            newComponent = lsmIndex.merge(operation);
-            operation.getCallback().afterOperation(LSMIOOperationType.MERGE, 
ctx.getComponentHolder(), newComponent);
-            newComponent.markAsValid(lsmIndex.isDurable());
-        } catch (Throwable e) { // NOSONAR: Log and re-throw
-            failedOperation = true;
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Failed merge operation on " + 
lsmIndex, e);
+            ILSMDiskComponent newComponent = null;
+            boolean failedOperation = false;
+            try {
+                newComponent = lsmIndex.merge(operation);
+                operation.getCallback()
+                        .afterOperation(LSMIOOperationType.MERGE, 
ctx.getComponentHolder(), newComponent);
+                newComponent.markAsValid(lsmIndex.isDurable());
+            } catch (Throwable e) { // NOSONAR: Log and re-throw
+                failedOperation = true;
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "Failed merge operation on " + 
lsmIndex, e);
+                }
+                throw e;
+            } finally {
+                exitComponents(ctx, LSMOperationType.MERGE, newComponent, 
failedOperation);
+                
operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
             }
-            throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.MERGE, newComponent, 
failedOperation);
-            // Completion of the merge operation is called here to and not on 
afterOperation because
-            // Deletion of the old components comes after afterOperation is 
called and the number of
-            // io operation should not be decremented before the operation is 
complete to avoid
-            // index destroy from competing with the merge on deletion of the 
files.
-            // The order becomes:
-            // 1. scheduleMerge
-            // 2. enterComponents
-            // 3. beforeOperation (increment the numOfIoOperations)
-            // 4. merge
-            // 5. exitComponents
-            // 6. afterOperation (no op)
-            // 7. delete components
-            // 8. completeOperation (decrement the numOfIoOperations)
+            /*
+             * Completion of the merge operation is called here to and not on 
afterOperation because
+             * deletion of old components comes after afterOperation is called 
and the number of
+             * io operation should not be decremented before the operation is 
complete to avoid
+             * index destroy from competing with the merge on deletion of the 
files.
+             * The order becomes:
+             * 1. scheduleMerge
+             * 2. enterComponents
+             * 3. beforeOperation (increment the numOfIoOperations)
+             * 4. merge
+             * 5. exitComponents
+             * 6. afterOperation (no op)
+             * 7. delete components
+             * 8. completeOperation (decrement the numOfIoOperations)
+             */
             opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, 
ctx.getSearchOperationCallback(),
                     ctx.getModificationCallback());
-            operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, 
newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished the merge operation for index: " + lsmIndex);

Reply via email to