Repository: asterixdb Updated Branches: refs/heads/master ebde95d63 -> 56d972fa6
[ASTERIXDB-2104][STO] Optimization for Correlated Policy - user model changes: no - storage format changes: no - interface changes: no Details: - Previously, we introduced an optimization to the prefix merge policy by ensuring component size grows exponentially after merging. This patch applies the same optimization to CorrelatedMergePolicy - A little refactoring of PrefixMergePolicy and CorrelatedMergePolicy for code reuse Change-Id: Icd84c77f1e2c34c410508fbc9de70156224ce932 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2019 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/56d972fa Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/56d972fa Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/56d972fa Branch: refs/heads/master Commit: 56d972fa6dd89769d6ba1220d49e2d54d48657d3 Parents: ebde95d Author: luochen01 <[email protected]> Authored: Thu Sep 21 16:26:58 2017 -0700 Committer: Luo Chen <[email protected]> Committed: Thu Sep 28 13:42:27 2017 -0700 ---------------------------------------------------------------------- .../context/CorrelatedPrefixMergePolicy.java | 221 +++---------------- .../CorrelatedPrefixMergePolicyFactory.java | 17 +- .../CorrelatedPrefixMergePolicyTest.java | 68 +----- .../am/lsm/common/impls/PrefixMergePolicy.java | 18 +- 4 files changed, 41 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java index d28b991..5bb9d49 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java @@ -22,26 +22,20 @@ package org.apache.asterix.common.context; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy; -public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { - - private long maxMergableComponentSize; - private int maxToleranceComponentCount; +public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy { private final IDatasetLifecycleManager datasetLifecycleManager; private final int datasetId; @@ -61,37 +55,9 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule // a merge all of the current candidates into a new single component. - if (fullMergeIsRequested) { - //full merge request is handled by each index separately, since it is possible that - //when a primary index wants to send full merge requests for all secondaries, - //one secondary index is being merged and the request cannot be scheduled - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); - if (!areComponentsReadableUnwritableState(immutableComponents)) { - return; - } - - ILSMIndexAccessor accessor = - index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - accessor.scheduleFullMerge(index.getIOOperationCallback()); - return; - } - - if (!index.isPrimaryIndex()) { - return; - } - List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); - if (!areComponentsReadableUnwritableState(immutableComponents)) { - return; + if (fullMergeIsRequested || index.isPrimaryIndex()) { + super.diskComponentAdded(index, fullMergeIsRequested); } - scheduleMerge(index); - } - - @Override - public void configure(Map<String, String> properties) { - maxMergableComponentSize = - Long.parseLong(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE)); - maxToleranceComponentCount = - Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT)); } /** @@ -101,106 +67,33 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { */ @Override public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException { - /** - * case 1. - * if mergableImmutableCommponentCount < threshold, - * merge operation is not lagged ==> return false. - * case 2. - * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge, - * merge operation is lagged. ==> return true. - * case 3. *SPECIAL CASE* - * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge, - * merge operation is lagged. ==> *schedule a merge operation* and then return true. - * This is a special case that requires to schedule a merge operation. - * Otherwise, all flush operations will be hung. - * This case can happen in a following situation: - * The system may crash when - * condition 1) the mergableImmutableCommponentCount >= threshold and - * condition 2) merge operation is going on. - * After the system is recovered, still condition 1) is true. - * If there are flush operations in the same dataset partition after the recovery, - * all these flush operations may not proceed since there is no ongoing merge and - * there will be no new merge either in this situation. - * Note for case 3, we only let the primary index to schedule merge operations on behalf - * of all indexes. - */ - - List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents(); - int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents); - - // [case 1] - if (mergableImmutableComponentCount < maxToleranceComponentCount) { - return false; - } - - boolean isMergeOngoing = isMergeOngoing(immutableComponents); - - if (isMergeOngoing) { - // [case 2] - return true; - } - if (index.isPrimaryIndex()) { - // [case 3] - // make sure that all components are of READABLE_UNWRITABLE state. - if (!areComponentsReadableUnwritableState(immutableComponents)) { - throw new IllegalStateException(); - } - // schedule a merge operation - boolean isMergeTriggered = scheduleMerge(index); - if (!isMergeTriggered) { - throw new IllegalStateException(); - } - return true; + return super.isMergeLagging(index); } else { - //[case 3] - //if the index is secondary then ignore the merge request (since the merge should be - //triggered by the primary) and here we simply treat it as not lagged. return false; } + } - private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException { + @Override + protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); + // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); - long totalSize = 0; - int startIndex = -1; - - int numComponents = immutableComponents.size(); - - for (int i = 0; i < numComponents; i++) { - ILSMComponent c = immutableComponents.get(i); - long componentSize = ((ILSMDiskComponent) c).getComponentSize(); - if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) { - startIndex = i; - totalSize = 0; - continue; - } - totalSize += componentSize; - boolean isLastComponent = i + 1 == numComponents ? true : false; - if (totalSize > maxMergableComponentSize - || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) { - //merge disk components from startIndex+1 to i - long minID = Long.MAX_VALUE; - long maxID = Long.MIN_VALUE; - for (int j = startIndex + 1; j <= i; j++) { - ILSMDiskComponentId id = immutableComponents.get(j).getComponentId(); - if (minID > id.getMinId()) { - minID = id.getMinId(); - } - if (maxID < id.getMaxId()) { - maxID = id.getMaxId(); - } - } - Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); - int partition = getIndexPartition(index, indexInfos); - triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition) - .collect(Collectors.toSet())); - return true; - } + Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents); + if (mergeableIndexes == null) { + //nothing to merge + return false; } - return false; + long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId(); + long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId(); + + Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos(); + int partition = getIndexPartition(index, indexInfos); + triggerScheduledMerge(minID, maxID, + indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet())); + return true; } /** @@ -224,15 +117,13 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { List<ILSMDiskComponent> mergableComponents = new ArrayList<>(); for (ILSMDiskComponent component : immutableComponents) { ILSMDiskComponentId id = component.getComponentId(); - if (!id.notFound()) { - if (id.getMinId() >= minID && id.getMaxId() <= maxID) { - mergableComponents.add(component); - } - if (id.getMaxId() < minID) { - //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs) - //if the component.maxID < minID, we can safely skip the rest disk components in the list - break; - } + if (id.getMinId() >= minID && id.getMaxId() <= maxID) { + mergableComponents.add(component); + } + if (id.getMaxId() < minID) { + //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs) + //if the component.maxID < minID, we can safely skip the rest disk components in the list + break; } } ILSMIndexAccessor accessor = @@ -241,62 +132,6 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy { } } - /** - * This method returns the number of mergable components among the given list - * of immutable components that are ordered from the latest component to order ones. A caller - * need to make sure the order in the list. - * - * @param immutableComponents - * @return the number of mergable component - * @throws HyracksDataException - */ - private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) - throws HyracksDataException { - int count = 0; - for (ILSMComponent c : immutableComponents) { - long componentSize = ((ILSMDiskComponent) c).getComponentSize(); - //stop when the first non-mergable component is found. - if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize - || ((ILSMDiskComponent) c).getComponentId().notFound()) { - break; - } - ++count; - } - return count; - } - - /** - * This method returns whether there is an ongoing merge operation or not by checking - * each component state of given components. - * - * @param immutableComponents - * @return true if there is an ongoing merge operation, false otherwise. - */ - private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { - int size = immutableComponents.size(); - for (int i = 0; i < size; i++) { - if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { - return true; - } - } - return false; - } - - /** - * checks whether all given components are of READABLE_UNWRITABLE state - * - * @param immutableComponents - * @return true if all components are of READABLE_UNWRITABLE state, false otherwise. - */ - private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) { - for (ILSMComponent c : immutableComponents) { - if (c.getState() != ComponentState.READABLE_UNWRITABLE) { - return false; - } - } - return true; - } - private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) { for (IndexInfo info : indexInfos) { if (info.getIndex() == index) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java index 3c141bc..25242d4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java @@ -19,27 +19,19 @@ package org.apache.asterix.common.context; -import java.util.Arrays; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory; -public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactory { +public class CorrelatedPrefixMergePolicyFactory extends PrefixMergePolicyFactory { private static final long serialVersionUID = 1L; public static final String NAME = "correlated-prefix"; public static final String KEY_DATASET_ID = "datasetId"; - public static final String KEY_MAX_COMPONENT_SIZE = "max-mergable-component-size"; - public static final String KEY_MAX_COMPONENT_COUNT = "max-tolerance-component-count"; - - private static final String[] SET_VALUES = new String[] { KEY_MAX_COMPONENT_SIZE, KEY_MAX_COMPONENT_COUNT }; - private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES)); @Override public String getName() { @@ -47,11 +39,6 @@ public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactor } @Override - public Set<String> getPropertiesNames() { - return PROPERTIES_NAMES; - } - - @Override public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, INCServiceContext ctx) { IDatasetLifecycleManager dslcManager = ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java index e36f4a8..c18ecc2 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy; -import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.context.IndexInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -154,69 +153,6 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { } @Test - public void testPrimaryNotFound() { - try { - List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), - new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND), - new LSMDiskComponentId(10, 19)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); - IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); - - List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), - new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24)); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); - IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); - - ILSMMergePolicy policy = mockMergePolicy(primary, secondary); - - policy.diskComponentAdded(secondary.getIndex(), false); - Assert.assertTrue(resultPrimaryIDs.isEmpty()); - Assert.assertTrue(resultSecondaryIDs.isEmpty()); - - policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35), - new LSMDiskComponentId(25, 29)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)), - resultSecondaryIDs); - } catch (HyracksDataException e) { - Assert.fail(e.getMessage()); - } - } - - @Test - public void testSecondaryNotFound() { - try { - List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), - new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24), - new LSMDiskComponentId(10, 19)); - List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>(); - IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0); - - List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35), - new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND), - new LSMDiskComponentId(20, 24)); - List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>(); - IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0); - - ILSMMergePolicy policy = mockMergePolicy(primary, secondary); - - policy.diskComponentAdded(secondary.getIndex(), false); - Assert.assertTrue(resultPrimaryIDs.isEmpty()); - Assert.assertTrue(resultSecondaryIDs.isEmpty()); - - policy.diskComponentAdded(primary.getIndex(), false); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), - new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs); - Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)), - resultSecondaryIDs); - - } catch (HyracksDataException e) { - Assert.fail(e.getMessage()); - } - } - - @Test public void testMultiPartition() { try { List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50), @@ -251,8 +187,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase { private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) { Map<String, String> properties = new HashMap<>(); - properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT)); - properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE)); + properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT)); + properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE)); Set<IndexInfo> indexInfos = new HashSet<>(); for (IndexInfo info : indexes) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java index 329e4fb..6878910 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java @@ -35,8 +35,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; public class PrefixMergePolicy implements ILSMMergePolicy { - private long maxMergableComponentSize; - private int maxToleranceComponentCount; + protected long maxMergableComponentSize; + protected int maxToleranceComponentCount; /** * This parameter is used to avoid merging a big component with a sequence of small components. @@ -178,7 +178,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @param immutableComponents * @return true if there is an ongoing merge operation, false otherwise. */ - private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { + protected boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) { int size = immutableComponents.size(); for (int i = 0; i < size; i++) { if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) { @@ -196,7 +196,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @param immutableComponents * @return the number of mergable component */ - private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) { + protected int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) { Pair<Integer, Integer> mergableIndexes = getMergableComponentsIndex(immutableComponents); return mergableIndexes == null ? 0 : mergableIndexes.getRight() - mergableIndexes.getLeft() + 1; } @@ -207,7 +207,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @param immutableComponents * @return true if all components are of READABLE_UNWRITABLE state, false otherwise. */ - private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) { + protected boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) { for (ILSMComponent c : immutableComponents) { if (c.getState() != ComponentState.READABLE_UNWRITABLE) { return false; @@ -224,21 +224,21 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @throws HyracksDataException * @throws IndexException */ - private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { + protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException { List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents()); // Reverse the components order so that we look at components from oldest to newest. Collections.reverse(immutableComponents); Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents); if (mergeableIndexes != null) { - scheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight()); + triggerScheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight()); return true; } else { return false; } } - private void scheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex, + private void triggerScheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex, int endIndex) throws HyracksDataException { List<ILSMDiskComponent> mergableComponents = new ArrayList<>(immutableComponents.subList(startIndex, endIndex + 1)); @@ -265,7 +265,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy { * @return a pair of indexes indicating the start and end position of the sequence * otherwise, return null if no sequence is found */ - private Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) { + protected Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) { int numComponents = immutableComponents.size(); for (int i = 0; i < numComponents; i++) { if (immutableComponents.get(i).getComponentSize() > maxMergableComponentSize
