Repository: asterixdb Updated Branches: refs/heads/master 8b3e67af4 -> ab8375ea3
[ASTERIXDB-2118][STO] Ensure flush ordering of memory components - user model changes: no - storage format changes: no - interface changes: no - Fix the bug of AsynchronousScheduler by waking up the next flush only when the current operation is a FLUSH operation Change-Id: I7de4a1625fdd3faaa07f65be2ebc714ec7564b29 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2038 Reviewed-by: Ian Maxon <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ab8375ea Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ab8375ea Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ab8375ea Branch: refs/heads/master Commit: ab8375ea3edec759d5590f5792926211ff22ca1b Parents: 8b3e67a Author: luochen01 <[email protected]> Authored: Fri Sep 29 15:02:34 2017 -0700 Committer: Luo Chen <[email protected]> Committed: Fri Sep 29 20:27:34 2017 -0700 ---------------------------------------------------------------------- .../lsm/common/impls/AsynchronousScheduler.java | 32 ++++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab8375ea/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java index 5f6766f..438bb0b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java @@ -60,19 +60,21 @@ public class AsynchronousScheduler implements ILSMIOOperationScheduler { super.afterExecute(r, t); LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r; ILSMIOOperation executedOp = task.getOperation(); - String id = executedOp.getIndexIdentifier(); - synchronized (this) { - runningFlushOperations.remove(id); - if (waitingFlushOperations.containsKey(id)) { - try { - ILSMIOOperation op = waitingFlushOperations.get(id).poll(); - if (op != null) { - scheduleOperation(op); - } else { - waitingFlushOperations.remove(id); + if (executedOp.getIOOpertionType() == LSMIOOpertionType.FLUSH) { + String id = executedOp.getIndexIdentifier(); + synchronized (this) { + runningFlushOperations.remove(id); + if (waitingFlushOperations.containsKey(id)) { + try { + ILSMIOOperation op = waitingFlushOperations.get(id).poll(); + if (op != null) { + scheduleOperation(op); + } else { + waitingFlushOperations.remove(id); + } + } catch (HyracksDataException e) { + t = e.getCause(); } - } catch (HyracksDataException e) { - t = e.getCause(); } } } @@ -84,7 +86,7 @@ public class AsynchronousScheduler implements ILSMIOOperationScheduler { public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException { if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) { executor.submit(operation); - } else { + } else if (operation.getIOOpertionType() == LSMIOOpertionType.FLUSH) { String id = operation.getIndexIdentifier(); synchronized (executor) { if (runningFlushOperations.containsKey(id)) { @@ -100,6 +102,10 @@ public class AsynchronousScheduler implements ILSMIOOperationScheduler { executor.submit(operation); } } + } else { + // this should never happen + // just guard here to avoid silient failures in case of future extensions + throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType()); } } }
