jihoonson commented on a change in pull request #8222: Fix bugs in
overshadowableManager and add unit tests
URL: https://github.com/apache/incubator-druid/pull/8222#discussion_r310785630
##########
File path:
core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
##########
@@ -189,89 +247,359 @@ private void
transitAtomicUpdateGroupState(AtomicUpdateGroup<T> atomicUpdateGrou
}
/**
- * Find all atomicUpdateGroups of the given state overshadowed by the given
rootPartitionRange and minorVersion.
+ * Find all atomicUpdateGroups of the given state overshadowed by the
minorVersion in the given rootPartitionRange.
* The atomicUpdateGroup of a higher minorVersion can have a wider
RootPartitionRange.
* To find all atomicUpdateGroups overshadowed by the given
rootPartitionRange and minorVersion,
* we first need to find the first key contained by the given
rootPartitionRange.
* Once we find such key, then we go through the entire map until we see an
atomicUpdateGroup of which
* rootRangePartition is not contained by the given rootPartitionRange.
+ *
+ * @param rangeOfAug the partition range to search for overshadowed groups.
+ * @param minorVersion the minor version to check overshadow relation. The
found groups will have lower minor versions
+ * than this.
+ * @param fromState the state to search for overshadowed groups.
+ *
+ * @return a list of found atomicUpdateGroups. It could be empty if no
groups are found.
+ */
+ @VisibleForTesting
+ List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug,
short minorVersion, State fromState)
+ {
+ final TreeMap<RootPartitionRange,
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+ Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>
current = findLowestOverlappingEntry(
+ rangeOfAug,
+ stateMap,
+ true
+ );
+
+ if (current == null) {
+ return Collections.emptyList();
+ }
+
+ // Going through the map to find all entries of the RootPartitionRange
contained by the given rangeOfAug.
+ // Note that RootPartitionRange of entries are always consecutive.
+ final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+ while (current != null && rangeOfAug.overlaps(current.getKey())) {
+ if (rangeOfAug.contains(current.getKey())) {
+ // versionToGroup is sorted by minorVersion.
+ // versionToGroup.headMap(minorVersion) below returns a map containing
all entries of lower minorVersions
+ // than the given minorVersion.
+ final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup =
current.getValue();
+ // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation,
especially size(), is not optimized.
+ // Note that size() is indirectly called in ArrayList.addAll() when
ObjectSortedSet.toArray() is called.
+ // See AbstractObjectCollection.toArray().
+ // If you see performance degradation here, probably we need to
improve the below line.
+ if (versionToGroup.firstShortKey() < minorVersion) {
+ found.addAll(versionToGroup.headMap(minorVersion).values());
+ }
+ }
+ current = stateMap.higherEntry(current.getKey());
+ }
+ return found;
+ }
+
+ private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug,
State fromState)
+ {
+ return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(),
fromState);
+ }
+
+ /**
+ * Find all atomicUpdateGroups which overshadow others of the given
minorVersion in the given rootPartitionRange.
+ * Similar to {@link #findOvershadowedBy}.
+ *
+ * Note that one atommicUpdateGroup can overshadow multiple other groups. If
you're finding overshadowing
+ * atomicUpdateGroups by calling this method in a loop, the results of this
method can contain duplicate groups.
+ *
+ * @param rangeOfAug the partition range to search for overshadowing
groups.
+ * @param minorVersion the minor version to check overshadow relation. The
found groups will have higher minor
+ * versions than this.
+ * @param fromState the state to search for overshadowed groups.
+ *
+ * @return a list of found atomicUpdateGroups. It could be empty if no
groups are found.
*/
- private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
+ @VisibleForTesting
+ List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug,
short minorVersion, State fromState)
+ {
+ final TreeMap<RootPartitionRange,
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
+ Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>
current = findLowestOverlappingEntry(
+ rangeOfAug,
+ stateMap,
+ false
+ );
+
+ if (current == null) {
+ return Collections.emptyList();
+ }
+
+ // Going through the map to find all entries of the RootPartitionRange
contains the given rangeOfAug.
+ // Note that RootPartitionRange of entries are always consecutive.
+ final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
+ while (current != null && current.getKey().overlaps(rangeOfAug)) {
+ if (current.getKey().contains(rangeOfAug)) {
+ // versionToGroup is sorted by minorVersion.
+ // versionToGroup.tailMap(minorVersion) below returns a map containing
all entries of equal to or higher
+ // minorVersions than the given minorVersion.
+ final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup =
current.getValue();
+ // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation,
especially size(), is not optimized.
+ // Note that size() is indirectly called in ArrayList.addAll() when
ObjectSortedSet.toArray() is called.
+ // See AbstractObjectCollection.toArray().
+ // If you see performance degradation here, probably we need to
improve the below line.
+ if (versionToGroup.lastShortKey() > minorVersion) {
+ found.addAll(versionToGroup.tailMap(minorVersion).values());
+ }
+ }
+ current = stateMap.higherEntry(current.getKey());
+ }
+ return found;
+ }
+
+ private Entry<RootPartitionRange,
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
RootPartitionRange rangeOfAug,
- short minorVersion,
- State fromState
+ TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>
stateMap,
+ boolean strictSameStartId
)
{
- final TreeMap<RootPartitionRange,
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>
current = stateMap.floorEntry(rangeOfAug);
if (current == null) {
- return Collections.emptyList();
+ current = stateMap.ceilingEntry(rangeOfAug);
+ }
+
+ if (current == null) {
+ return null;
+ }
+
+ // floorEntry() can return the greatest key less than rangeOfAug. We need
to skip non-overlapping keys.
+ while (current != null && !current.getKey().overlaps(rangeOfAug)) {
+ current = stateMap.higherEntry(current.getKey());
+ }
+
+ final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
+ if (strictSameStartId) {
+ predicate = (entryRange, groupRange) -> entryRange.startPartitionId ==
groupRange.startPartitionId;
+ } else {
+ predicate = RootPartitionRange::overlaps;
}
- // Find the first key for searching for overshadowed atomicUpdateGroup
- while (true) {
+ // There could be multiple entries of the same startPartitionId but
different endPartitionId.
+ // Find the first key of the same startPartitionId which has the lowest
endPartitionId.
+ while (current != null) {
final Entry<RootPartitionRange,
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
current.getKey()
);
- if (lowerEntry != null && lowerEntry.getKey().startPartitionId ==
rangeOfAug.startPartitionId) {
+ if (lowerEntry != null && predicate.test(lowerEntry.getKey(),
rangeOfAug)) {
current = lowerEntry;
} else {
break;
}
}
- // Going through the map to find all entries of the RootPartitionRange
contained by the given rangeOfAug.
- // Note that RootPartitionRange of entries are always consecutive.
- final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new
ArrayList<>();
- while (current != null && rangeOfAug.contains(current.getKey())) {
- // versionToGroup is sorted by minorVersion.
- // versionToGroup.subMap(firstKey, minorVersion) below returns a map
containing all entries of lower minorVersions
- // than the given minorVersion.
- final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup =
current.getValue();
- // Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation,
especially size(), is not optimized.
- // Note that size() is indirectly called in ArrayList.addAll() when
ObjectSortedSet.toArray() is called.
- // See AbstractObjectCollection.toArray().
- // If you see performance degradation here, probably we need to improve
the below line.
- found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(),
minorVersion).short2ObjectEntrySet());
- current = stateMap.higherEntry(current.getKey());
- }
- return found;
+ return current;
}
/**
- * Handles addition of the atomicUpdateGroup to the given state
+ * Determine the visible group after a new chunk is added.
*/
- private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State
stateOfAug)
+ private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State
stateOfAug)
{
if (stateOfAug == State.STANDBY) {
// A standby atomicUpdateGroup becomes visible when its all segments are
available.
if (aug.isFull()) {
// A visible atomicUpdateGroup becomes overshadowed when a fully
available standby atomicUpdateGroup becomes
// visible which overshadows the current visible one.
- findOvershadowedBy(aug, State.VISIBLE)
- .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(),
State.VISIBLE, State.OVERSHADOWED));
+ replaceVisibleWith(
+ findOvershadowedBy(aug, State.VISIBLE),
+ State.OVERSHADOWED,
+ Collections.singletonList(aug),
+ State.STANDBY
+ );
findOvershadowedBy(aug, State.STANDBY)
- .forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(),
State.STANDBY, State.OVERSHADOWED));
- transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
+ .forEach(entry -> transitAtomicUpdateGroupState(entry,
State.STANDBY, State.OVERSHADOWED));
+ } else {
+ // The given atomicUpdateGroup is in the standby state which means
it's not overshadowed by the visible group.
+ // If the visible group is not fully available, then the new standby
group should be visible since it has a
+ // higher minor version.
+ checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(aug,
stateOfAug);
}
+ } else if (stateOfAug == State.OVERSHADOWED) {
+ checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug,
stateOfAug);
}
}
- private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State
state)
+ /**
+ * This method is called in {@link #determineVisibleGroupAfterAdd}. It
checks the current visible group is
+ * fully available, and if not, moves the given group to the visible state.
+ */
+ private void checkVisibleIsFullyAvailableAndMoveNewStandbyToVisible(
Review comment:
Good point. Moved the full standby handling into this method and updated its
javadoc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]