jon-wei commented on a change in pull request #7547: Add support minor compaction with segment locking URL: https://github.com/apache/incubator-druid/pull/7547#discussion_r294552695
########## File path: core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java ########## @@ -0,0 +1,780 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.objects.AbstractObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectCollection; +import it.unimi.dsi.fastutil.objects.ObjectIterator; +import it.unimi.dsi.fastutil.objects.ObjectIterators; +import it.unimi.dsi.fastutil.objects.ObjectSortedSet; +import it.unimi.dsi.fastutil.objects.ObjectSortedSets; +import it.unimi.dsi.fastutil.shorts.AbstractShort2ObjectSortedMap; +import it.unimi.dsi.fastutil.shorts.Short2ObjectMap; +import it.unimi.dsi.fastutil.shorts.Short2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.shorts.Short2ObjectSortedMap; +import it.unimi.dsi.fastutil.shorts.ShortComparator; +import it.unimi.dsi.fastutil.shorts.ShortComparators; +import it.unimi.dsi.fastutil.shorts.ShortSortedSet; +import it.unimi.dsi.fastutil.shorts.ShortSortedSets; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.timeline.Overshadowable; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details of + * the possible state. + * Note that an AtomicUpdateGroup can consist of {@link Overshadowable}s of the same majorVersion, minorVersion, + * rootPartition range, and atomicUpdateGroupSize. + * In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same + * timeChunk. + * + * This class is not thread-safe. + */ +class OvershadowableManager<T extends Overshadowable<T>> +{ + private enum State + { + STANDBY, // have atomicUpdateGroup of higher versions than visible + VISIBLE, // have a single fully available atomicUpdateGroup of highest version + OVERSHADOWED // have atomicUpdateGroup of lower versions than visible + } + + private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments + + // (start partitionId, end partitionId) -> minorVersion -> atomicUpdateGroup + private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> standbyGroups; + private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> visibleGroup; + private final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> overshadowedGroups; + + OvershadowableManager() + { + this.knownPartitionChunks = new HashMap<>(); + this.standbyGroups = new TreeMap<>(); + this.visibleGroup = new TreeMap<>(); + this.overshadowedGroups = new TreeMap<>(); + } + + OvershadowableManager(OvershadowableManager<T> other) + { + this.knownPartitionChunks = new HashMap<>(other.knownPartitionChunks); + this.standbyGroups = new TreeMap<>(other.standbyGroups); + this.visibleGroup = new TreeMap<>(other.visibleGroup); + this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups); + } + + private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state) + { + switch (state) { + case STANDBY: + return standbyGroups; + case VISIBLE: + return visibleGroup; + case OVERSHADOWED: + return overshadowedGroups; + default: + throw new ISE("Unknown state[%s]", state); + } + } + + private Short2ObjectSortedMap<AtomicUpdateGroup<T>> createMinorVersionToAugMap(State state) + { + switch (state) { + case STANDBY: + case OVERSHADOWED: + return new Short2ObjectRBTreeMap<>(); + case VISIBLE: + return new SingleEntryShort2ObjectSortedMap<>(); + default: + throw new ISE("Unknown state[%s]", state); + } + } + + private void transitPartitionChunkState(AtomicUpdateGroup<T> atomicUpdateGroup, State from, State to) + { + Preconditions.checkNotNull(atomicUpdateGroup, "atomicUpdateGroup"); + Preconditions.checkArgument(!atomicUpdateGroup.isEmpty(), "empty atomicUpdateGroup"); + + removeFrom(atomicUpdateGroup, from); + addTo(atomicUpdateGroup, to); + } + + @Nullable + private AtomicUpdateGroup<T> searchForStateOf(PartitionChunk<T> chunk, State state) + { + final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = getStateMap(state).get( + RootPartitionRange.of(chunk) + ); + if (versionToGroup != null) { + final AtomicUpdateGroup<T> atomicUpdateGroup = versionToGroup.get(chunk.getObject().getMinorVersion()); + if (atomicUpdateGroup != null) { + return atomicUpdateGroup; + } + } + return null; + } + + /** + * Returns null if atomicUpdateGroup is not found for the state. + * Can return an empty atomicUpdateGroup. + */ + @Nullable + private AtomicUpdateGroup<T> tryRemoveFromState(PartitionChunk<T> chunk, State state) + { + final RootPartitionRange rangeKey = RootPartitionRange.of(chunk); + final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = getStateMap(state).get(rangeKey); + if (versionToGroup != null) { + final AtomicUpdateGroup<T> atomicUpdateGroup = versionToGroup.get(chunk.getObject().getMinorVersion()); + if (atomicUpdateGroup != null) { + atomicUpdateGroup.remove(chunk); + if (atomicUpdateGroup.isEmpty()) { + versionToGroup.remove(chunk.getObject().getMinorVersion()); + if (versionToGroup.isEmpty()) { + getStateMap(state).remove(rangeKey); + } + } + + handleRemove(atomicUpdateGroup, RootPartitionRange.of(chunk), chunk.getObject().getMinorVersion(), state); + return atomicUpdateGroup; + } + } + return null; + } + + private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy( + AtomicUpdateGroup<T> aug, + State fromState + ) + { + final RootPartitionRange rangeKeyOfGivenAug = RootPartitionRange.of(aug); + return findOvershadowedBy(rangeKeyOfGivenAug, aug.getMinorVersion(), fromState); + } + + private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy( + RootPartitionRange rangeOfAug, + short minorVersion, + State fromState + ) + { + Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = getStateMap(fromState) + .floorEntry(rangeOfAug); + + if (current == null) { + return Collections.emptyList(); + } + + // Find the first key for searching for overshadowed atomicUpdateGroup + while (true) { + final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = getStateMap(fromState) + .lowerEntry(current.getKey()); + if (lowerEntry != null && lowerEntry.getKey().startPartitionId == rangeOfAug.startPartitionId) { + current = lowerEntry; + } else { + break; + } + } + + final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>(); + while (current != null && rangeOfAug.contains(current.getKey())) { + final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue(); + found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), minorVersion).short2ObjectEntrySet()); + current = getStateMap(fromState).higherEntry(current.getKey()); + } + return found; + } + + /** + * Handles addition of the atomicUpdateGroup to the given state + */ + private void handleAdd(AtomicUpdateGroup<T> aug, State newStateOfAug) + { + if (newStateOfAug == State.STANDBY) { + // A standby atomicUpdateGroup becomes visible when its all segments are available. + if (aug.isFull()) { + // A visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup becomes + // visible which overshadows the current visible one. + findOvershadowedBy(aug, State.VISIBLE) + .forEach(entry -> transitPartitionChunkState(entry.getValue(), State.VISIBLE, State.OVERSHADOWED)); + findOvershadowedBy(aug, State.STANDBY) + .forEach(entry -> transitPartitionChunkState(entry.getValue(), State.STANDBY, State.OVERSHADOWED)); + transitPartitionChunkState(aug, State.STANDBY, State.VISIBLE); + } + } + } + + private void addTo(AtomicUpdateGroup<T> aug, State state) + { + final AtomicUpdateGroup<T> existing = getStateMap(state) + .computeIfAbsent(RootPartitionRange.of(aug), k -> createMinorVersionToAugMap(state)) + .put(aug.getMinorVersion(), aug); + + if (existing != null) { + throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", aug, state); + } + + handleAdd(aug, state); + } + + public void add(PartitionChunk<T> chunk) + { + final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk); + if (existingChunk != null && !existingChunk.equals(chunk)) { Review comment: Can you add a comment about when add might be called with a chunk that's already known? (e.g., handling replicas) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
