Repository: asterixdb Updated Branches: refs/heads/master a5e3d0ea7 -> fcd89f5a9
[ASTERIXDB-2045][STO] Do Not Wait For Lagging Merge on Failed Flush - user model changes: no - storage format changes: no - interface changes: no Details: - Failed flush operations shouldn't wait for lagging merges since they won't add any new disk components. - Use logger to log exceptions. Change-Id: I915e993a76d5c692a276b1d7f3426a25f910cf46 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1947 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fcd89f5a Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fcd89f5a Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fcd89f5a Branch: refs/heads/master Commit: fcd89f5a92a80b2b69d99845471856a5909be9a8 Parents: a5e3d0e Author: Murtadha Hubail <[email protected]> Authored: Thu Aug 17 14:35:28 2017 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Thu Aug 17 15:00:35 2017 -0700 ---------------------------------------------------------------------- .../storage/am/lsm/common/impls/LSMHarness.java | 61 +++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fcd89f5a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index d0dc4b3..35c93ba 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -120,6 +120,7 @@ public class LSMHarness implements ILSMHarness { // There is only a single component. There is nothing to merge. return false; } + break; default: break; } @@ -161,7 +162,9 @@ public class LSMHarness implements ILSMHarness { } entranceSuccessful = numEntered == components.size(); } catch (Throwable e) { - e.printStackTrace(); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, opType.name() + " failed to enter components on " + lsmIndex, e); + } throw e; } finally { if (!entranceSuccessful) { @@ -223,12 +226,8 @@ public class LSMHarness implements ILSMHarness { */ if (opType == LSMOperationType.FLUSH) { opTracker.notifyAll(); - while (mergePolicy.isMergeLagging(lsmIndex)) { - try { - opTracker.wait(); - } catch (InterruptedException e) { - //ignore - } + if (!failedOperation) { + waitForLaggingMerge(); } } else if (opType == LSMOperationType.MERGE) { opTracker.notifyAll(); @@ -274,7 +273,7 @@ public class LSMHarness implements ILSMHarness { switch (opType) { case FLUSH: // newComponent is null if the flush op. was not performed. - if (newComponent != null) { + if (!failedOperation && newComponent != null) { lsmIndex.addDiskComponent(newComponent); if (replicationEnabled) { componentsToBeReplicated.clear(); @@ -286,7 +285,7 @@ public class LSMHarness implements ILSMHarness { break; case MERGE: // newComponent is null if the merge op. was not performed. - if (newComponent != null) { + if (!failedOperation && newComponent != null) { lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder()); if (replicationEnabled) { componentsToBeReplicated.clear(); @@ -300,7 +299,9 @@ public class LSMHarness implements ILSMHarness { break; } } catch (Throwable e) { - e.printStackTrace(); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, e.getMessage(), e); + } throw e; } finally { if (failedOperation && (opType == LSMOperationType.MODIFICATION @@ -351,7 +352,9 @@ public class LSMHarness implements ILSMHarness { ((AbstractLSMDiskComponent) c).destroy(); } } catch (Throwable e) { - LOGGER.log(Level.WARNING, "Failure scheduling replication or destroying merged component", e); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Failure scheduling replication or destroying merged component", e); + } throw e; } } @@ -456,7 +459,7 @@ public class LSMHarness implements ILSMHarness { try { exitComponents(ctx, LSMOperationType.SEARCH, null, false); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } @@ -512,7 +515,9 @@ public class LSMHarness implements ILSMHarness { lsmIndex.markAsValid(newComponent); } catch (Throwable e) { failedOperation = true; - e.printStackTrace(); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e); + } throw e; } finally { exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); @@ -561,7 +566,9 @@ public class LSMHarness implements ILSMHarness { lsmIndex.markAsValid(newComponent); } catch (Throwable e) { failedOperation = true; - LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e); + } throw e; } finally { exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation); @@ -616,12 +623,10 @@ public class LSMHarness implements ILSMHarness { @Override public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException { - //enter the LSM components to be replicated to prevent them from being deleted until they are replicated if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) { return; } - lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType); } @@ -717,6 +722,30 @@ public class LSMHarness implements ILSMHarness { throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL); } + /** + * Waits for any lagging merge operations to finish to avoid breaking + * the merge policy (i.e. adding a new disk component can make the + * number of mergable immutable components > maxToleranceComponentCount + * by the merge policy) + * + * @throws HyracksDataException + */ + private void waitForLaggingMerge() throws HyracksDataException { + synchronized (opTracker) { + while (mergePolicy.isMergeLagging(lsmIndex)) { + try { + opTracker.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Ignoring interrupt while waiting for lagging merge on " + lsmIndex, + e); + } + } + } + } + } + @Override public String toString() { return getClass().getSimpleName() + ":" + lsmIndex;
