imply-cheddar commented on code in PR #12277:
URL: https://github.com/apache/druid/pull/12277#discussion_r979524132


##########
core/src/main/java/org/apache/druid/segment/data/VByte.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import java.nio.ByteBuffer;
+
+public class VByte
+{
+  /**
+   * Read a variable byte (vbyte) encoded integer from a {@link ByteBuffer} at 
the current position.
+   *
+   * vbyte encoding stores values in the last 7 bits of a byte and reserves 
the high bit for the 'contination'. If 0,
+   * one or more aditional bytes must be read to complete the value, and a 1 
indicates the terminal byte. Because of
+   * this, it can only store positive values, and larger integers can take up 
to 5 bytes.
+   *
+   * implementation based on:
+   * 
https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java
+   *
+   */
+  public static int readInt(ByteBuffer buffer)

Review Comment:
   An interface that leverages the position of the `ByteBuffer` to do reads 
often generates a lot of garbage (extra `ByteBuffer` objects) as people use it. 
 In terms of the API, it's better to take a Buffer and a position and do 
positional reads, as that can be more easily used by optimal code as well as 
less optimal code.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    if (hasNull) {
+      if (index == 0) {
+        return null;
+      } else {
+        adjustedIndex = index - 1;
+      }
+    } else {
+      adjustedIndex = index;
+    }
+    // find the bucket which contains the value with maths
+    final int offsetNum = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    // get offset of that bucket in the value buffer
+    final int offset = offsetNum > 0 ? buffer.getInt(offsetsPosition + 
((offsetNum - 1) * Integer.BYTES)) : 0;
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition + offset);
+    return getFromBucket(copy, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currBucketIndex = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currBucketIndex * bucketSize;
+
+      final ByteBuffer currBucketFirstValue = get(currBucketFirstValueIndex + 
adjustIndex);
+      // we compare against the adjacent bucket to determine if the value is 
actually in this bucket or if we need
+      // to keep searching buckets
+      final ByteBuffer nextBucketFirstValue = get(currBucketFirstValueIndex + 
bucketSize + adjustIndex);
+      int comparison = comparator.compare(currBucketFirstValue, value);
+      if (comparison == 0) {
+        // it turns out that the first value in current bucket is what we are 
looking for, short circuit
+        return currBucketFirstValueIndex + adjustIndex;
+      }
+
+      int comparisonNext = comparator.compare(nextBucketFirstValue, value);
+      if (comparison < 0 && comparisonNext > 0) {
+        // this is exactly the right bucket
+        final int offset = currBucketIndex > 0 ? copy.getInt(offsetsPosition + 
((currBucketIndex - 1) * Integer.BYTES)) : 0;
+        copy.position(bucketsPosition + offset);
+        // find the value in the bucket (or where it would be if it were 
present)
+        final int pos = findInBucket(copy, bucketSize, value, comparator);
+        if (pos < 0) {
+          return -currBucketFirstValueIndex + pos - adjustIndex;
+        }
+        return currBucketFirstValueIndex + pos + adjustIndex;
+      } else if (comparison < 0) {
+        minBucketIndex = currBucketIndex + 1;
+      } else {
+        maxBucketIndex = currBucketIndex - 1;
+      }
+    }
+
+    // this is where we ended up, try to find the value in the bucket
+    final int bucketIndexBase = minBucketIndex * bucketSize;
+    final int numValuesInBucket;
+    if (minBucketIndex == numBuckets - 1) {
+      numValuesInBucket = lastBucketNumValues;
+    } else {
+      numValuesInBucket = bucketSize;
+    }
+    final int offset;
+    if (minBucketIndex > 0) {
+      offset = copy.getInt(offsetsPosition + ((minBucketIndex - 1) * 
Integer.BYTES));
+    } else {
+      offset = 0;
+    }
+    copy.position(bucketsPosition + offset);
+    final int pos = findInBucket(copy, numValuesInBucket, value, comparator);
+    if (pos < 0) {
+      return -bucketIndexBase + pos - adjustIndex;
+    }
+    return bucketIndexBase + pos + adjustIndex;
+  }
+
+  @Override
+  public Iterator<ByteBuffer> iterator()
+  {
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? 
bucketSize : lastBucketNumValues);
+    // iterator decodes and buffers a bucket at a time, paging through buckets 
as the iterator is consumed
+    return new Iterator<ByteBuffer>()
+    {
+      private int currIndex = 0;
+      private int currentBucketIndex = 0;
+      private ByteBuffer[] currentBucket = firstBucket;
+
+      @Override
+      public boolean hasNext()
+      {
+        return currIndex < adjustedNumValues;
+      }
+
+      @Override
+      public ByteBuffer next()
+      {
+        // null is handled special
+        if (hasNull && currIndex == 0) {
+          currIndex++;
+          return null;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        final int adjustedCurrIndex = hasNull ? currIndex - 1 : currIndex;
+        final int bucketNum = adjustedCurrIndex >> div;
+        // load next bucket if needed
+        if (bucketNum != currentBucketIndex) {
+          final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * 
Integer.BYTES));
+          copy.position(bucketsPosition + offset);
+          currentBucket = readBucket(
+              copy,
+              bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
+          );
+          currentBucketIndex = bucketNum;
+        }
+        int offset = adjustedCurrIndex & rem;
+        currIndex++;
+        return currentBucket[offset];
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("buffer", buffer);
+    inspector.visit("hasNulls", hasNull);
+    inspector.visit("bucketSize", bucketSize);
+  }
+
+  /**
+   * Get a value from a bucket at a relative position
+   */
+  public static ByteBuffer getFromBucket(ByteBuffer bucket, int offset)
+  {
+    final byte[] first = readBytes(bucket);
+    if (offset == 0) {
+      return ByteBuffer.wrap(first);
+    }
+    int pos = 0;
+    int prefixLength;
+    final byte[] fragment;
+    // scan through bucket values until we reach offset
+    do {
+      prefixLength = VByte.readInt(bucket);
+      if (++pos < offset) {
+        final int skipLength = VByte.readInt(bucket);
+        bucket.position(bucket.position() + skipLength);
+      } else {
+        fragment = readBytes(bucket);
+        break;
+      }
+    } while (true);
+    return combinePrefixAndFragment(first, prefixLength, fragment);
+  }
+
+  /**
+   * Find the relative position of a value in a bucket with a linear scan
+   */
+  public static int findInBucket(ByteBuffer bucket, int numValues, ByteBuffer 
value, Comparator<ByteBuffer> comparator)
+  {
+    final byte[] first = readBytes(bucket);
+    final int firstCompare = comparator.compare(ByteBuffer.wrap(first), value);
+    if (firstCompare == 0) {
+      return 0;
+    }
+    if (firstCompare > 0) {
+      return -1;
+    }
+    int offset = 0;
+    int prefixLength;
+    byte[] fragment;
+    // scan through bucket values until we find match or compare numValues
+    int insertionPoint = 1;
+    while (++offset < numValues) {
+      prefixLength = VByte.readInt(bucket);
+      fragment = readBytes(bucket);
+      ByteBuffer next = combinePrefixAndFragment(first, prefixLength, 
fragment);

Review Comment:
   combining all the time to then compare seems quite wasteful, can we not push 
this comparison down to the buffer instead of copying it out like this?
   
   For example, given that we know the target value, we should know the size of 
prefix that matches it by this point.  We should be able to read the prefix 
length and make sure they are the same, if they aren't the same, then no match, 
move on.  Though, that works if we only care about equality.  If we want to 
also have mathematical comparisons, we likely need to be a bit more 
intelligent, but can still use known prefixes and other things pushed all the 
way down to the buffer instead of the naive read, build buffer, compare via 
generic comparator approach here.



##########
processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java:
##########
@@ -384,7 +387,10 @@ public void writeIndexes(@Nullable List<IntBuffer> 
segmentRowNumConversions) thr
     }
   }
 
-
+  protected DictionaryWriter<T> getWriter(String fileName)

Review Comment:
   I have a suspicion that this is being done to enable changing the 
implementation of the dictionary.  It would be a *lot* nicer if we can find a 
way to make that done via a constructor instead of via overriding a method that 
isn't even abstract.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    if (hasNull) {
+      if (index == 0) {
+        return null;
+      } else {
+        adjustedIndex = index - 1;
+      }
+    } else {
+      adjustedIndex = index;
+    }
+    // find the bucket which contains the value with maths
+    final int offsetNum = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    // get offset of that bucket in the value buffer
+    final int offset = offsetNum > 0 ? buffer.getInt(offsetsPosition + 
((offsetNum - 1) * Integer.BYTES)) : 0;

Review Comment:
   Why `- 1`?



##########
processing/src/main/java/org/apache/druid/segment/column/IndexedStringDruidPredicateIndex.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public final class IndexedStringDruidPredicateIndex<T extends Indexed<String>>
+    extends BaseIndexedDictionaryEncodedIndex<String, T> implements 
DruidPredicateIndex

Review Comment:
   Here too, it appears like we are introducing a base class simply to store 
the fields and share state between parent and child.  No reason this class 
needs a parent class to store its fields for it.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that

Review Comment:
   Does the code that uses this actually leverage the whole `negative insertion 
point` part?  That was a detail that leaked from the BinarySearch 
implementation that, IIRC, was useful in only limited situations, I'm wondering 
how important it really is to maintain that semantic?



##########
processing/src/main/java/org/apache/druid/segment/IndexSpec.java:
##########
@@ -116,37 +98,44 @@ public IndexSpec(
    * @param dimensionCompression compression format for dimension columns, 
null to use the default.
    *                             Defaults to {@link 
CompressionStrategy#DEFAULT_COMPRESSION_STRATEGY}
    *
+   * @param stringDictionaryEncoding encoding strategy for string dictionaries 
of dictionary encoded string columns
+   *
    * @param metricCompression compression format for primitive type metric 
columns, null to use the default.
    *                          Defaults to {@link 
CompressionStrategy#DEFAULT_COMPRESSION_STRATEGY}
    *
    * @param longEncoding encoding strategy for metric and dimension columns 
with type long, null to use the default.
    *                     Defaults to {@link 
CompressionFactory#DEFAULT_LONG_ENCODING_STRATEGY}
+   *
+   * @param segmentLoader specify a {@link SegmentizerFactory} which will be 
written to 'factory.json' and used to load
+   *                      the written segment
    */
   @JsonCreator
   public IndexSpec(
       @JsonProperty("bitmap") @Nullable BitmapSerdeFactory bitmapSerdeFactory,
       @JsonProperty("dimensionCompression") @Nullable CompressionStrategy 
dimensionCompression,
+      @JsonProperty("stringDictionaryEncoding") @Nullable 
StringEncodingStrategy stringDictionaryEncoding,

Review Comment:
   Is there a reason not to make it a configuration of the column itself?  I'm 
guessing this is being done out of convenience, but I'm hoping that it is 
relatively easy to make it a configuration of the column too?  Would make it a 
lot easier to do apples-for-apples comparisons if we can have the exact same 
data encoded multiple different ways in different columns...



##########
processing/src/main/java/org/apache/druid/segment/column/IndexedStringDictionaryEncodedStringValueIndex.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+
+public final class IndexedStringDictionaryEncodedStringValueIndex<T extends 
Indexed<String>>

Review Comment:
   I don't understand the object structure introduced here.  The biggest 
"smell" is that the abstraction is breaking good practice for abstract classes: 
fields are being shared across class boundaries.  The `dictionary` that is 
being passed into the constructor in this class is being stored as a field on 
the parent and then being accessed here again.  Sharing state between parent 
and child classes is a big no-no, it always leads to sadness as the code 
evolves over time.
   
   That said, why do we need a base class anyway?  It seems quite simple enough 
to just have a concrete implementation that takes these same constructor 
parameters and implements `DictionaryEncodedStringValueIndex`, so I'd recommend 
eliminating the abstraction and just making the single class that we need.



##########
processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java:
##########
@@ -71,6 +76,15 @@ public StringDimensionMergerV9(
   )
   {
     super(dimensionName, indexSpec, segmentWriteOutMedium, capabilities, 
progress, closer);
+    if (capabilities.hasSpatialIndexes()) {
+      Preconditions.checkArgument(
+          
StringEncodingStrategy.UTF8.equals(indexSpec.getStringDictionaryEncoding().getType()),
+          StringUtils.format(
+              "Spatial indexes are incompatible with [%s] encoded 
dictionaries",
+              indexSpec.getStringDictionaryEncoding().getType()
+          )
+      );

Review Comment:
   Is this basically saying that if there are spatial indexes in use, then the 
new encoding cannot be used?  That's sad if true and maybe more reason to make 
it a column-specific thing...



##########
processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8LexicographicalRangeIndex.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Predicate;
+import it.unimi.dsi.fastutil.ints.IntIntImmutablePair;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.IntListUtils;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public final class IndexedUtf8LexicographicalRangeIndex<T extends 
Indexed<ByteBuffer>>

Review Comment:
   And, as I see more of these implementations, it feels like the thing that 
`BaseIndexedDictionaryEncodedIndex` is trying to provide is essentially shared 
code for `null` handling.  Instead of the parent/child relationship in this 
current code, I'd recommend just creating a `BitmapIndexed` or 
`IndexedOfBitmaps` or whatever class that takes an Indexed and provides the 
method for accessing the the bitmaps that you want.  That's essentially what 
you are getting at with all of these.
   
   Perhaps the idea was to keep away from virtual function calls...  If so, 
let's just make sure all of these places depend on `BitmapIndexed` as a 
concrete class, not an interface.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    if (hasNull) {
+      if (index == 0) {
+        return null;
+      } else {
+        adjustedIndex = index - 1;
+      }
+    } else {
+      adjustedIndex = index;
+    }
+    // find the bucket which contains the value with maths
+    final int offsetNum = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    // get offset of that bucket in the value buffer
+    final int offset = offsetNum > 0 ? buffer.getInt(offsetsPosition + 
((offsetNum - 1) * Integer.BYTES)) : 0;
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition + offset);
+    return getFromBucket(copy, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currBucketIndex = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currBucketIndex * bucketSize;
+
+      final ByteBuffer currBucketFirstValue = get(currBucketFirstValueIndex + 
adjustIndex);
+      // we compare against the adjacent bucket to determine if the value is 
actually in this bucket or if we need
+      // to keep searching buckets
+      final ByteBuffer nextBucketFirstValue = get(currBucketFirstValueIndex + 
bucketSize + adjustIndex);
+      int comparison = comparator.compare(currBucketFirstValue, value);
+      if (comparison == 0) {
+        // it turns out that the first value in current bucket is what we are 
looking for, short circuit
+        return currBucketFirstValueIndex + adjustIndex;
+      }
+
+      int comparisonNext = comparator.compare(nextBucketFirstValue, value);
+      if (comparison < 0 && comparisonNext > 0) {
+        // this is exactly the right bucket
+        final int offset = currBucketIndex > 0 ? copy.getInt(offsetsPosition + 
((currBucketIndex - 1) * Integer.BYTES)) : 0;
+        copy.position(bucketsPosition + offset);
+        // find the value in the bucket (or where it would be if it were 
present)
+        final int pos = findInBucket(copy, bucketSize, value, comparator);
+        if (pos < 0) {
+          return -currBucketFirstValueIndex + pos - adjustIndex;
+        }
+        return currBucketFirstValueIndex + pos + adjustIndex;
+      } else if (comparison < 0) {
+        minBucketIndex = currBucketIndex + 1;
+      } else {
+        maxBucketIndex = currBucketIndex - 1;
+      }
+    }
+
+    // this is where we ended up, try to find the value in the bucket
+    final int bucketIndexBase = minBucketIndex * bucketSize;
+    final int numValuesInBucket;
+    if (minBucketIndex == numBuckets - 1) {
+      numValuesInBucket = lastBucketNumValues;
+    } else {
+      numValuesInBucket = bucketSize;
+    }
+    final int offset;
+    if (minBucketIndex > 0) {
+      offset = copy.getInt(offsetsPosition + ((minBucketIndex - 1) * 
Integer.BYTES));
+    } else {
+      offset = 0;
+    }
+    copy.position(bucketsPosition + offset);
+    final int pos = findInBucket(copy, numValuesInBucket, value, comparator);
+    if (pos < 0) {
+      return -bucketIndexBase + pos - adjustIndex;
+    }
+    return bucketIndexBase + pos + adjustIndex;
+  }
+
+  @Override
+  public Iterator<ByteBuffer> iterator()
+  {
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? 
bucketSize : lastBucketNumValues);
+    // iterator decodes and buffers a bucket at a time, paging through buckets 
as the iterator is consumed
+    return new Iterator<ByteBuffer>()
+    {
+      private int currIndex = 0;
+      private int currentBucketIndex = 0;
+      private ByteBuffer[] currentBucket = firstBucket;
+
+      @Override
+      public boolean hasNext()
+      {
+        return currIndex < adjustedNumValues;
+      }
+
+      @Override
+      public ByteBuffer next()
+      {
+        // null is handled special
+        if (hasNull && currIndex == 0) {
+          currIndex++;
+          return null;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        final int adjustedCurrIndex = hasNull ? currIndex - 1 : currIndex;
+        final int bucketNum = adjustedCurrIndex >> div;
+        // load next bucket if needed
+        if (bucketNum != currentBucketIndex) {
+          final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * 
Integer.BYTES));
+          copy.position(bucketsPosition + offset);
+          currentBucket = readBucket(
+              copy,
+              bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
+          );
+          currentBucketIndex = bucketNum;
+        }
+        int offset = adjustedCurrIndex & rem;
+        currIndex++;
+        return currentBucket[offset];
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("buffer", buffer);
+    inspector.visit("hasNulls", hasNull);
+    inspector.visit("bucketSize", bucketSize);
+  }
+
+  /**
+   * Get a value from a bucket at a relative position
+   */
+  public static ByteBuffer getFromBucket(ByteBuffer bucket, int offset)
+  {
+    final byte[] first = readBytes(bucket);
+    if (offset == 0) {
+      return ByteBuffer.wrap(first);
+    }
+    int pos = 0;
+    int prefixLength;
+    final byte[] fragment;
+    // scan through bucket values until we reach offset
+    do {
+      prefixLength = VByte.readInt(bucket);
+      if (++pos < offset) {
+        final int skipLength = VByte.readInt(bucket);
+        bucket.position(bucket.position() + skipLength);
+      } else {
+        fragment = readBytes(bucket);
+        break;
+      }
+    } while (true);
+    return combinePrefixAndFragment(first, prefixLength, fragment);
+  }
+
+  /**
+   * Find the relative position of a value in a bucket with a linear scan
+   */
+  public static int findInBucket(ByteBuffer bucket, int numValues, ByteBuffer 
value, Comparator<ByteBuffer> comparator)
+  {
+    final byte[] first = readBytes(bucket);
+    final int firstCompare = comparator.compare(ByteBuffer.wrap(first), value);
+    if (firstCompare == 0) {
+      return 0;
+    }
+    if (firstCompare > 0) {
+      return -1;
+    }

Review Comment:
   When this is called from `indexOf()` this comparison and handling of it have 
already occurred.  Maybe not the end of the world, but why do it again?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    if (hasNull) {
+      if (index == 0) {
+        return null;
+      } else {
+        adjustedIndex = index - 1;
+      }
+    } else {
+      adjustedIndex = index;
+    }
+    // find the bucket which contains the value with maths
+    final int offsetNum = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    // get offset of that bucket in the value buffer
+    final int offset = offsetNum > 0 ? buffer.getInt(offsetsPosition + 
((offsetNum - 1) * Integer.BYTES)) : 0;
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition + offset);
+    return getFromBucket(copy, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currBucketIndex = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currBucketIndex * bucketSize;
+
+      final ByteBuffer currBucketFirstValue = get(currBucketFirstValueIndex + 
adjustIndex);
+      // we compare against the adjacent bucket to determine if the value is 
actually in this bucket or if we need
+      // to keep searching buckets
+      final ByteBuffer nextBucketFirstValue = get(currBucketFirstValueIndex + 
bucketSize + adjustIndex);
+      int comparison = comparator.compare(currBucketFirstValue, value);
+      if (comparison == 0) {
+        // it turns out that the first value in current bucket is what we are 
looking for, short circuit
+        return currBucketFirstValueIndex + adjustIndex;
+      }
+
+      int comparisonNext = comparator.compare(nextBucketFirstValue, value);
+      if (comparison < 0 && comparisonNext > 0) {
+        // this is exactly the right bucket
+        final int offset = currBucketIndex > 0 ? copy.getInt(offsetsPosition + 
((currBucketIndex - 1) * Integer.BYTES)) : 0;
+        copy.position(bucketsPosition + offset);
+        // find the value in the bucket (or where it would be if it were 
present)
+        final int pos = findInBucket(copy, bucketSize, value, comparator);
+        if (pos < 0) {
+          return -currBucketFirstValueIndex + pos - adjustIndex;
+        }
+        return currBucketFirstValueIndex + pos + adjustIndex;
+      } else if (comparison < 0) {
+        minBucketIndex = currBucketIndex + 1;
+      } else {
+        maxBucketIndex = currBucketIndex - 1;
+      }
+    }
+
+    // this is where we ended up, try to find the value in the bucket
+    final int bucketIndexBase = minBucketIndex * bucketSize;
+    final int numValuesInBucket;
+    if (minBucketIndex == numBuckets - 1) {
+      numValuesInBucket = lastBucketNumValues;
+    } else {
+      numValuesInBucket = bucketSize;
+    }
+    final int offset;
+    if (minBucketIndex > 0) {
+      offset = copy.getInt(offsetsPosition + ((minBucketIndex - 1) * 
Integer.BYTES));
+    } else {
+      offset = 0;
+    }
+    copy.position(bucketsPosition + offset);
+    final int pos = findInBucket(copy, numValuesInBucket, value, comparator);
+    if (pos < 0) {
+      return -bucketIndexBase + pos - adjustIndex;
+    }
+    return bucketIndexBase + pos + adjustIndex;
+  }
+
+  @Override
+  public Iterator<ByteBuffer> iterator()
+  {
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? 
bucketSize : lastBucketNumValues);
+    // iterator decodes and buffers a bucket at a time, paging through buckets 
as the iterator is consumed
+    return new Iterator<ByteBuffer>()
+    {
+      private int currIndex = 0;
+      private int currentBucketIndex = 0;
+      private ByteBuffer[] currentBucket = firstBucket;
+
+      @Override
+      public boolean hasNext()
+      {
+        return currIndex < adjustedNumValues;
+      }
+
+      @Override
+      public ByteBuffer next()
+      {
+        // null is handled special
+        if (hasNull && currIndex == 0) {
+          currIndex++;
+          return null;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        final int adjustedCurrIndex = hasNull ? currIndex - 1 : currIndex;
+        final int bucketNum = adjustedCurrIndex >> div;
+        // load next bucket if needed
+        if (bucketNum != currentBucketIndex) {
+          final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * 
Integer.BYTES));
+          copy.position(bucketsPosition + offset);
+          currentBucket = readBucket(
+              copy,
+              bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
+          );
+          currentBucketIndex = bucketNum;
+        }
+        int offset = adjustedCurrIndex & rem;
+        currIndex++;
+        return currentBucket[offset];
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("buffer", buffer);
+    inspector.visit("hasNulls", hasNull);
+    inspector.visit("bucketSize", bucketSize);
+  }
+
+  /**
+   * Get a value from a bucket at a relative position
+   */
+  public static ByteBuffer getFromBucket(ByteBuffer bucket, int offset)
+  {
+    final byte[] first = readBytes(bucket);
+    if (offset == 0) {
+      return ByteBuffer.wrap(first);
+    }
+    int pos = 0;
+    int prefixLength;
+    final byte[] fragment;
+    // scan through bucket values until we reach offset
+    do {
+      prefixLength = VByte.readInt(bucket);
+      if (++pos < offset) {
+        final int skipLength = VByte.readInt(bucket);
+        bucket.position(bucket.position() + skipLength);
+      } else {
+        fragment = readBytes(bucket);
+        break;
+      }
+    } while (true);
+    return combinePrefixAndFragment(first, prefixLength, fragment);
+  }
+
+  /**
+   * Find the relative position of a value in a bucket with a linear scan
+   */
+  public static int findInBucket(ByteBuffer bucket, int numValues, ByteBuffer 
value, Comparator<ByteBuffer> comparator)
+  {
+    final byte[] first = readBytes(bucket);
+    final int firstCompare = comparator.compare(ByteBuffer.wrap(first), value);
+    if (firstCompare == 0) {
+      return 0;
+    }
+    if (firstCompare > 0) {
+      return -1;
+    }
+    int offset = 0;
+    int prefixLength;
+    byte[] fragment;
+    // scan through bucket values until we find match or compare numValues
+    int insertionPoint = 1;
+    while (++offset < numValues) {
+      prefixLength = VByte.readInt(bucket);
+      fragment = readBytes(bucket);
+      ByteBuffer next = combinePrefixAndFragment(first, prefixLength, 
fragment);
+      final int cmp = comparator.compare(next, value);
+      if (cmp == 0) {
+        return offset;
+      } else if (cmp < 0) {
+        insertionPoint++;
+      } else {
+        break;
+      }
+    }
+    return -(insertionPoint + 1);
+  }
+
+  /**
+   * reconstruct a value given the bytes of the first bucket value, the length 
of the first value to use as a prefix,
+   * and the fragment of the value to use after the prefix.
+   */
+  private static ByteBuffer combinePrefixAndFragment(byte[] first, int 
prefixLength, byte[] fragment)
+  {
+    ByteBuffer next = ByteBuffer.allocate(prefixLength + fragment.length);

Review Comment:
   In reading this code, I find myself wondering if this class wouldn't be 
better off with a Buffer that it reuses and callers of this class needs to be 
aware that the Buffer gets reused (so they must make a copy of the contents if 
they want to hang onto a reference of it).  
   
   In general, the buffers returned from things get reused, so I'm pretty sure 
most of the client code should be fine with this semantic (e.g. the `String` 
indexed wrapper would consume and more on really fast) and it would reduce 
object allocations.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    if (hasNull) {
+      if (index == 0) {
+        return null;
+      } else {
+        adjustedIndex = index - 1;
+      }
+    } else {
+      adjustedIndex = index;
+    }
+    // find the bucket which contains the value with maths
+    final int offsetNum = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    // get offset of that bucket in the value buffer
+    final int offset = offsetNum > 0 ? buffer.getInt(offsetsPosition + 
((offsetNum - 1) * Integer.BYTES)) : 0;
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition + offset);
+    return getFromBucket(copy, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currBucketIndex = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currBucketIndex * bucketSize;
+
+      final ByteBuffer currBucketFirstValue = get(currBucketFirstValueIndex + 
adjustIndex);
+      // we compare against the adjacent bucket to determine if the value is 
actually in this bucket or if we need
+      // to keep searching buckets
+      final ByteBuffer nextBucketFirstValue = get(currBucketFirstValueIndex + 
bucketSize + adjustIndex);

Review Comment:
   Delegating to `get()` like this is going to create a bunch of extra objects 
as it will read into a `byte[]` and then wrap that in a `ByteBuffer` in order 
to return it.  You are just comparing things, so should be able to compare 
bytes directly using the `copy` that you already created without needing to 
create any new objects.  Should re-write this to not be based on get, but to 
push the comparison down to the `copy` buffer instead.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index

Review Comment:
   Why not just force `0` == `null` in all cases?  Seems like it's gonna be a 
lot simpler to just have the dictionary id of `0` always equal to `null` 
instead of trying to shift things around sometimes and not others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to