Repository: asterixdb Updated Branches: refs/heads/master 3180d8702 -> 929344e93
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 6ebf52c..96a31c6 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -111,8 +111,8 @@ public class LogBuffer implements ILogBuffer { logRecord.isFlushed(false); flushQ.add(logRecord); } - } else if (logRecord.getLogSource() == LogSource.REMOTE - && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { + } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT + || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) { remoteJobsQ.add(logRecord); } this.notify(); @@ -260,10 +260,9 @@ public class LogBuffer implements ILogBuffer { } else if (logRecord.getLogType() == LogType.WAIT) { notifyWaitTermination(); } - } else if (logRecord.getLogSource() == LogSource.REMOTE) { - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { - notifyReplicationTermination(); - } + } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT + || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) { + notifyReplicationTermination(); } logRecord = logBufferTailReader.next(); } @@ -295,10 +294,12 @@ public class LogBuffer implements ILogBuffer { public void notifyFlushTermination() throws ACIDException { LogRecord logRecord = null; - try { - logRecord = (LogRecord) flushQ.take(); - } catch (InterruptedException e) { - //ignore + while (logRecord == null) { + try { + logRecord = (LogRecord) flushQ.take(); + } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts + //ignore + } } synchronized (logRecord) { logRecord.isFlushed(true); @@ -316,10 +317,12 @@ public class LogBuffer implements ILogBuffer { public void notifyReplicationTermination() { LogRecord logRecord = null; - try { - logRecord = (LogRecord) remoteJobsQ.take(); - } catch (InterruptedException e) { - //ignore + while (logRecord == null) { + try { + logRecord = (LogRecord) remoteJobsQ.take(); + } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts + //ignore + } } logRecord.isFlushed(true); IReplicationThread replicationThread = logRecord.getReplicationThread(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java index d3c056d..a6ceba8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java @@ -411,4 +411,8 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage prevTimestamp = ts; return ts; } + + public static String getComponentEndTime(String fileName) { + return fileName.split(DELIMITER)[1]; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 393aa6a..5368591 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -560,22 +560,31 @@ public class LSMHarness implements ILSMHarness { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Started a flush operation for index: " + lsmIndex + " ..."); } - - ILSMDiskComponent newComponent = null; - boolean failedOperation = false; try { - newComponent = lsmIndex.flush(operation); - operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent); - newComponent.markAsValid(lsmIndex.isDurable()); - } catch (Throwable e) { // NOSONAR Log and re-throw - failedOperation = true; - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e); + ILSMDiskComponent newComponent = null; + boolean failedOperation = false; + try { + newComponent = lsmIndex.flush(operation); + operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent); + newComponent.markAsValid(lsmIndex.isDurable()); + } catch (Throwable e) { // NOSONAR Log and re-throw + failedOperation = true; + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e); + } + throw e; + } finally { + exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); + operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent); + } - throw e; } finally { - exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); - operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent); + /* + * Completion of flush/merge operations is done explicitly here to make sure all generated files during + * io operations is completed before the io operation is declared complete + */ + opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH, ctx.getSearchOperationCallback(), + ctx.getModificationCallback()); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished the flush operation for index: " + lsmIndex); @@ -612,37 +621,42 @@ public class LSMHarness implements ILSMHarness { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Started a merge operation for index: " + lsmIndex + " ..."); } - - ILSMDiskComponent newComponent = null; - boolean failedOperation = false; try { - newComponent = lsmIndex.merge(operation); - operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); - newComponent.markAsValid(lsmIndex.isDurable()); - } catch (Throwable e) { // NOSONAR: Log and re-throw - failedOperation = true; - if (LOGGER.isLoggable(Level.SEVERE)) { - LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e); + ILSMDiskComponent newComponent = null; + boolean failedOperation = false; + try { + newComponent = lsmIndex.merge(operation); + operation.getCallback() + .afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent); + newComponent.markAsValid(lsmIndex.isDurable()); + } catch (Throwable e) { // NOSONAR: Log and re-throw + failedOperation = true; + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e); + } + throw e; + } finally { + exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation); + operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); } - throw e; } finally { - exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation); - // Completion of the merge operation is called here to and not on afterOperation because - // Deletion of the old components comes after afterOperation is called and the number of - // io operation should not be decremented before the operation is complete to avoid - // index destroy from competing with the merge on deletion of the files. - // The order becomes: - // 1. scheduleMerge - // 2. enterComponents - // 3. beforeOperation (increment the numOfIoOperations) - // 4. merge - // 5. exitComponents - // 6. afterOperation (no op) - // 7. delete components - // 8. completeOperation (decrement the numOfIoOperations) + /* + * Completion of the merge operation is called here to and not on afterOperation because + * deletion of old components comes after afterOperation is called and the number of + * io operation should not be decremented before the operation is complete to avoid + * index destroy from competing with the merge on deletion of the files. + * The order becomes: + * 1. scheduleMerge + * 2. enterComponents + * 3. beforeOperation (increment the numOfIoOperations) + * 4. merge + * 5. exitComponents + * 6. afterOperation (no op) + * 7. delete components + * 8. completeOperation (decrement the numOfIoOperations) + */ opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, ctx.getSearchOperationCallback(), ctx.getModificationCallback()); - operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished the merge operation for index: " + lsmIndex);
