paul-rogers commented on code in PR #12277:
URL: https://github.com/apache/druid/pull/12277#discussion_r1002259712


##########
processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8ValueSetIndex.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.BitmapResultFactory;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
+
+public final class IndexedUtf8ValueSetIndex<TDictionary extends 
Indexed<ByteBuffer>>
+    implements StringValueSetIndex, Utf8ValueSetIndex
+{
+  // This determines the cut-off point to swtich the merging algorithm from 
doing binary-search per element in the value

Review Comment:
   swtich -> switch



##########
processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
+import org.apache.druid.segment.data.FrontCodedIndexedWriter;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+
+public class StringEncodingStrategies
+{
+  public static DictionaryWriter<String> getStringDictionaryWriter(
+      StringEncodingStrategy encodingStrategy,
+      SegmentWriteOutMedium writeoutMedium,
+      String fileName
+  )
+  {
+    // write plain utf8 in the legacy format, where generic indexed was 
written directly
+    if (StringEncodingStrategy.UTF8.equals(encodingStrategy.getType())) {
+      return new GenericIndexedWriter<>(writeoutMedium, fileName, 
GenericIndexed.STRING_STRATEGY);
+    } else {
+      // otherwise, we wrap in an EncodedStringDictionaryWriter so that we 
write a small header that includes
+      // a version byte that should hopefully never conflict with a 
GenericIndexed version, along with a byte
+      // from StringEncodingStrategy.getId to indicate which encoding strategy 
is used for the dictionary before
+      // writing the dictionary itself
+      DictionaryWriter<byte[]> writer;
+      if 
(StringEncodingStrategy.FRONT_CODED.equals(encodingStrategy.getType())) {
+        writer = new FrontCodedIndexedWriter(
+            writeoutMedium,
+            ByteOrder.nativeOrder(),
+            ((StringEncodingStrategy.FrontCoded) 
encodingStrategy).getBucketSize()
+        );
+      } else {
+        throw new ISE("Unknown encoding strategy: %s", 
encodingStrategy.getType());
+      }
+      return new EncodedStringDictionaryWriter(writer, encodingStrategy);
+    }
+  }
+
+  /**
+   * Adapter to convert {@link Indexed<ByteBuffer>} with utf8 encoded bytes 
into {@link Indexed<String>} to be frinedly

Review Comment:
   frinedly -> friendly



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return () -> new FrontCodedIndexed(
+        orderedBuffer,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        offsetsPosition,
+        offsetsPosition + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,

Review Comment:
   One wonders what is "adjusted" about `adjustedNumValue`. Explain?



##########
core/src/main/java/org/apache/druid/segment/data/VByte.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import java.nio.ByteBuffer;
+
+public class VByte
+{
+  /**
+   * Read a variable byte (vbyte) encoded integer from a {@link ByteBuffer} at 
the current position.
+   *
+   * vbyte encoding stores values in the last 7 bits of a byte and reserves 
the high bit for the 'contination'. If 0,
+   * one or more aditional bytes must be read to complete the value, and a 1 
indicates the terminal byte. Because of
+   * this, it can only store positive values, and larger integers can take up 
to 5 bytes.
+   *
+   * implementation based on:
+   * 
https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java
+   *
+   */
+  public static int readInt(ByteBuffer buffer)

Review Comment:
   The garbage issue is a valid one: low level code should minimize garbage. I 
would seem a bug if we create many `ByteBuffers`. There should be one per 
column (or buffer), not one per column value. If that is true, then the number 
should be reasonable.
   
   @clintropolis suggests that, to take a position as input, we have to return 
a length for variable-length fields. This generates garbage per-call as we 
create, then discard the required `Pair` and `Integer` objects. One could pass 
an `AtomicInteger`, but that seems fiddly.
   
   This implementation seems the correct one. If we have a "too many 
`ByteBuffer` problem, perhaps we should fix that instead.
   



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static FrontCodedIndexed read(ByteBuffer buffer, 
Comparator<ByteBuffer> comparator, ByteOrder ordering)
+  {
+    final ByteBuffer copy = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = copy.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = copy.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == copy.get();
+    final int numValues = VByte.readInt(copy);
+    // size of offsets + values
+    final int size = VByte.readInt(copy);
+    // move position to end of buffer
+    buffer.position(copy.position() + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return new FrontCodedIndexed(
+        copy,
+        comparator,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        copy.position(),
+        copy.position() + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+  private final Comparator<ByteBuffer> comparator;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      Comparator<ByteBuffer> comparator,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer;
+    this.comparator = comparator;
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    final int adjustedIndex;
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index

Review Comment:
   The presence of a null value seemed to be used elsewhere to tell is whether 
the column contains nulls (if I understood correctly.) This suggests we need a 
null entry only if there are, in fact, null values. In SQL, NULL and "" are 
distinct things. There is, unfortunately, no string shorter than length 0. 
However, if we encode the entry as (length, bytes), can we use a length of all 
1s (i.e. -1) to indicate a null value which would be present only if the column 
contains at least one null (in SQL-speak, if the column is nullable?)



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+
+/**
+ * {@link DictionaryWriter} for a {@link FrontCodedIndexed}, written to a 
{@link SegmentWriteOutMedium}. Values MUST
+ * be added to this dictionary writer in sorted order, which is enforced.
+ *
+ * Front coding is a type of delta encoding for byte arrays, where values are 
grouped into buckets. The first value of
+ * the bucket is written entirely, and remaining values are stored as pairs of 
an integer which indicates how much
+ * of the first byte array of the bucket to use as a prefix, followed by the 
remaining value bytes after the prefix.
+ *
+ * This is valid to use for any values which can be compared byte by byte with 
unsigned comparison. Otherwise, this
+ * is not the collection for you.
+ */
+public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
+{
+  private static final int MAX_LOG_BUFFER_SIZE = 26;
+  private final SegmentWriteOutMedium segmentWriteOutMedium;
+  private final int bucketSize;
+  private final ByteOrder byteOrder;
+  private final byte[][] bucketBuffer;
+  @Nullable
+  private byte[] prevObject = null;
+  @Nullable
+  private WriteOutBytes headerOut = null;
+  @Nullable
+  private WriteOutBytes valuesOut = null;
+  private int numWritten = 0;
+  private ByteBuffer scratch;
+  private int logScratchSize = 10;
+  private boolean isClosed = false;
+  private boolean hasNulls = false;
+
+
+  public FrontCodedIndexedWriter(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ByteOrder byteOrder,
+      int bucketSize
+  )
+  {
+    this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.scratch = ByteBuffer.allocate(1 << logScratchSize).order(byteOrder);
+    this.bucketSize = bucketSize;
+    this.byteOrder = byteOrder;
+    this.bucketBuffer = new byte[bucketSize][];
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    headerOut = segmentWriteOutMedium.makeWriteOutBytes();
+    valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
+  }
+
+  @Override
+  public void write(@Nullable byte[] value) throws IOException
+  {
+    if (prevObject != null && unsignedCompare(prevObject, value) >= 0) {
+      throw new ISE(
+          "Values must be sorted and unique. Element [%s] with value [%s] is 
before or equivalent to [%s]",
+          numWritten,
+          value == null ? null : StringUtils.fromUtf8(value),
+          StringUtils.fromUtf8(prevObject)
+      );
+    }
+
+    if (value == null) {
+      hasNulls = true;
+      return;
+    }
+
+    // if the bucket buffer is full, write the bucket
+    if (numWritten > 0 && (numWritten % bucketSize) == 0) {

Review Comment:
   Where do we pick the optimal bucket "key frames"? It looks like we write 
buckets until they are full. Doesn't this put us at the mercy of the data 
distribution rather than trying to find (near) optimal key values? That is, in 
the worst case, we could have a list of "a", "b", "ba", "bab", "bob", ... and 
our bucket would start with "a" and there would be no compression. If we 
instead make a 1-value bucket, then started the next with "b", w'd save 3 bytes 
from the following values.
   
   The papers linked in the PR suggest a sampling algorithm, but it does appear 
to make multiple passes over the data, and thus can't be done streaming. Or, is 
this one of the future enhancements mentioned in the description? If it is, 
does this encoding support variable-sized buckets? We'd still have *n* buckets, 
but sizes would vary around the (value count / n) mean.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.

Review Comment:
   Thanks for the explanation! Suggestion: copy the layout description from the 
PR description into Javadoc here. Future readers will find it hard to follow 
the code without that context.
   
   Would also recommend linking the papers cited in the PR description, since 
they are key to understanding the algorithm.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);

Review Comment:
   The key to this math is to realize that `bucketSize` must be a power of 2, 
as explained in the error message below. A comment here to reinforce that idea? 
Also, I read `bucketSize` as "size in bytes". I think it actually means "number 
of entries." Bucket size-in-bytes is variable, hence the bucket index. A 
comment would help clarify.



##########
processing/src/main/java/org/apache/druid/segment/column/StringFrontCodedDictionaryEncodedColumn.java:
##########
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.AbstractDimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.data.ColumnarInts;
+import org.apache.druid.segment.data.ColumnarMultiInts;
+import org.apache.druid.segment.data.FrontCodedIndexed;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.ReadableOffset;
+import org.apache.druid.segment.data.SingleIndexedInt;
+import org.apache.druid.segment.filter.BooleanValueMatcher;
+import org.apache.druid.segment.historical.HistoricalDimensionSelector;
+import 
org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.ReadableVectorInspector;
+import org.apache.druid.segment.vector.ReadableVectorOffset;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+/**
+ * {@link DictionaryEncodedColumn<String>} for a column which uses a {@link 
FrontCodedIndexed} to store its value
+ * dictionary, which 'delta encodes' strings (instead of {@link 
org.apache.druid.segment.data.GenericIndexed} like
+ * {@link StringDictionaryEncodedColumn}).
+ *
+ * This class is otherwise nearly identical to {@link 
StringDictionaryEncodedColumn} other than the dictionary
+ * difference.
+ */
+public class StringFrontCodedDictionaryEncodedColumn implements 
DictionaryEncodedColumn<String>
+{
+  @Nullable
+  private final ColumnarInts column;
+  @Nullable
+  private final ColumnarMultiInts multiValueColumn;
+  private final FrontCodedIndexed utf8Dictionary;
+
+  public StringFrontCodedDictionaryEncodedColumn(
+      @Nullable ColumnarInts singleValueColumn,
+      @Nullable ColumnarMultiInts multiValueColumn,
+      FrontCodedIndexed utf8Dictionary
+  )
+  {
+    this.column = singleValueColumn;
+    this.multiValueColumn = multiValueColumn;
+    this.utf8Dictionary = utf8Dictionary;
+  }
+
+  @Override
+  public int length()
+  {
+    return hasMultipleValues() ? multiValueColumn.size() : column.size();
+  }
+
+  @Override
+  public boolean hasMultipleValues()
+  {
+    return column == null;
+  }
+
+  @Override
+  public int getSingleValueRow(int rowNum)
+  {
+    return column.get(rowNum);
+  }
+
+  @Override
+  public IndexedInts getMultiValueRow(int rowNum)
+  {
+    return multiValueColumn.get(rowNum);
+  }
+
+  @Override
+  @Nullable
+  public String lookupName(int id)
+  {
+    final ByteBuffer buffer = utf8Dictionary.get(id);
+    if (buffer == null) {
+      return null;
+    }
+    return StringUtils.fromUtf8(buffer);
+  }
+
+  @Override
+  public int lookupId(String name)
+  {
+    return utf8Dictionary.indexOf(StringUtils.toUtf8ByteBuffer(name));
+  }
+
+  @Override
+  public int getCardinality()
+  {
+    return utf8Dictionary.size();
+  }
+
+  @Override
+  public HistoricalDimensionSelector makeDimensionSelector(
+      final ReadableOffset offset,
+      @Nullable final ExtractionFn extractionFn
+  )
+  {
+    abstract class QueryableDimensionSelector extends AbstractDimensionSelector
+        implements HistoricalDimensionSelector, IdLookup
+    {
+      @Override
+      public int getValueCardinality()
+      {
+        /*
+         This is technically wrong if
+         extractionFn != null && (extractionFn.getExtractionType() != 
ExtractionFn.ExtractionType.ONE_TO_ONE ||
+                                    !extractionFn.preservesOrdering())
+         However current behavior allows some GroupBy-V1 queries to work that 
wouldn't work otherwise and doesn't
+         cause any problems due to special handling of extractionFn everywhere.
+         See https://github.com/apache/druid/pull/8433
+         */
+        return getCardinality();
+      }
+
+      @Override
+      public String lookupName(int id)
+      {
+        final String value = 
StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
+        return extractionFn == null ? value : extractionFn.apply(value);
+      }
+
+      @Nullable
+      @Override
+      public ByteBuffer lookupNameUtf8(int id)
+      {
+        return utf8Dictionary.get(id);
+      }
+
+      @Override
+      public boolean supportsLookupNameUtf8()
+      {
+        return true;
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        return true;
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        return extractionFn == null ? this : null;
+      }
+
+      @Override
+      public int lookupId(String name)
+      {
+        if (extractionFn != null) {
+          throw new UnsupportedOperationException("cannot perform lookup when 
applying an extraction function");
+        }
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
+      }
+    }
+
+    if (hasMultipleValues()) {
+      class MultiValueDimensionSelector extends QueryableDimensionSelector
+      {
+        @Override
+        public IndexedInts getRow()
+        {
+          return multiValueColumn.get(offset.getOffset());
+        }
+
+        @Override
+        public IndexedInts getRow(int offset)
+        {
+          return multiValueColumn.get(offset);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(@Nullable String value)
+        {
+          return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+        {
+          return DimensionSelectorUtils.makeValueMatcherGeneric(this, 
predicate);
+        }
+
+        @Nullable
+        @Override
+        public Object getObject()
+        {
+          return defaultGetObject();
+        }
+
+        @Override
+        public Class classOfObject()
+        {
+          return Object.class;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("multiValueColumn", multiValueColumn);
+          inspector.visit("offset", offset);
+          inspector.visit("extractionFn", extractionFn);
+        }
+      }
+      return new MultiValueDimensionSelector();
+    } else {
+      class SingleValueQueryableDimensionSelector extends 
QueryableDimensionSelector
+          implements SingleValueHistoricalDimensionSelector
+      {
+        private final SingleIndexedInt row = new SingleIndexedInt();
+
+        @Override
+        public IndexedInts getRow()
+        {
+          row.setValue(getRowValue());
+          return row;
+        }
+
+        public int getRowValue()
+        {
+          return column.get(offset.getOffset());
+        }
+
+        @Override
+        public IndexedInts getRow(int offset)
+        {
+          row.setValue(getRowValue(offset));
+          return row;
+        }
+
+        @Override
+        public int getRowValue(int offset)
+        {
+          return column.get(offset);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(final @Nullable String value)
+        {
+          if (extractionFn == null) {
+            final int valueId = super.lookupId(value);
+            if (valueId >= 0) {
+              return new ValueMatcher()
+              {
+                @Override
+                public boolean matches()
+                {
+                  return getRowValue() == valueId;
+                }
+
+                @Override
+                public void inspectRuntimeShape(RuntimeShapeInspector 
inspector)
+                {
+                  inspector.visit("column", 
StringFrontCodedDictionaryEncodedColumn.this);
+                }
+              };
+            } else {
+              return BooleanValueMatcher.of(false);
+            }
+          } else {
+            // Employ caching BitSet optimization
+            return makeValueMatcher(Predicates.equalTo(value));
+          }
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(final Predicate<String> predicate)
+        {
+          final BitSet checkedIds = new BitSet(getCardinality());
+          final BitSet matchingIds = new BitSet(getCardinality());
+
+          // Lazy matcher; only check an id if matches() is called.
+          return new ValueMatcher()
+          {
+            @Override
+            public boolean matches()
+            {
+              final int id = getRowValue();
+
+              if (checkedIds.get(id)) {
+                return matchingIds.get(id);
+              } else {
+                final boolean matches = predicate.apply(lookupName(id));
+                checkedIds.set(id);
+                if (matches) {
+                  matchingIds.set(id);
+                }
+                return matches;
+              }
+            }
+
+            @Override
+            public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+            {
+              inspector.visit("column", 
StringFrontCodedDictionaryEncodedColumn.this);
+            }
+          };
+        }
+
+        @Override
+        public Object getObject()
+        {
+          return super.lookupName(getRowValue());
+        }
+
+        @Override
+        public Class classOfObject()
+        {
+          return String.class;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("column", column);
+          inspector.visit("offset", offset);
+          inspector.visit("extractionFn", extractionFn);
+        }
+      }
+      return new SingleValueQueryableDimensionSelector();
+    }
+  }
+
+  @Override
+  public SingleValueDimensionVectorSelector 
makeSingleValueDimensionVectorSelector(final ReadableVectorOffset offset)
+  {
+    class QueryableSingleValueDimensionVectorSelector implements 
SingleValueDimensionVectorSelector, IdLookup
+    {
+      private final int[] vector = new int[offset.getMaxVectorSize()];
+      private int id = ReadableVectorInspector.NULL_ID;
+
+      @Override
+      public int[] getRowVector()
+      {
+        if (id == offset.getId()) {
+          return vector;
+        }
+
+        if (offset.isContiguous()) {
+          column.get(vector, offset.getStartOffset(), 
offset.getCurrentVectorSize());
+        } else {
+          column.get(vector, offset.getOffsets(), 
offset.getCurrentVectorSize());
+        }
+
+        id = offset.getId();
+        return vector;
+      }
+
+      @Override
+      public int getValueCardinality()
+      {
+        return getCardinality();
+      }
+
+      @Nullable
+      @Override
+      public String lookupName(final int id)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
+      }
+
+      @Nullable
+      @Override
+      public ByteBuffer lookupNameUtf8(int id)
+      {
+        return utf8Dictionary.get(id);
+      }
+
+      @Override
+      public boolean supportsLookupNameUtf8()
+      {
+        return true;
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        return true;
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        return this;
+      }
+
+      @Override
+      public int lookupId(@Nullable final String name)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
+      }
+
+      @Override
+      public int getCurrentVectorSize()
+      {
+        return offset.getCurrentVectorSize();
+      }
+
+      @Override
+      public int getMaxVectorSize()
+      {
+        return offset.getMaxVectorSize();
+      }
+    }
+
+    return new QueryableSingleValueDimensionVectorSelector();
+  }
+
+  @Override
+  public MultiValueDimensionVectorSelector 
makeMultiValueDimensionVectorSelector(final ReadableVectorOffset offset)
+  {
+    class QueryableMultiValueDimensionVectorSelector implements 
MultiValueDimensionVectorSelector, IdLookup
+    {
+      private final IndexedInts[] vector = new 
IndexedInts[offset.getMaxVectorSize()];
+      private int id = ReadableVectorInspector.NULL_ID;
+
+      @Override
+      public IndexedInts[] getRowVector()
+      {
+        if (id == offset.getId()) {
+          return vector;
+        }
+
+        if (offset.isContiguous()) {
+          final int currentOffset = offset.getStartOffset();
+          final int numRows = offset.getCurrentVectorSize();
+
+          for (int i = 0; i < numRows; i++) {
+            // Must use getUnshared, otherwise all elements in the vector 
could be the same shared object.

Review Comment:
   Why would that be a bad thing? Are these objects mutated?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+
+/**
+ * {@link DictionaryWriter} for a {@link FrontCodedIndexed}, written to a 
{@link SegmentWriteOutMedium}. Values MUST
+ * be added to this dictionary writer in sorted order, which is enforced.
+ *
+ * Front coding is a type of delta encoding for byte arrays, where values are 
grouped into buckets. The first value of
+ * the bucket is written entirely, and remaining values are stored as pairs of 
an integer which indicates how much
+ * of the first byte array of the bucket to use as a prefix, followed by the 
remaining value bytes after the prefix.
+ *
+ * This is valid to use for any values which can be compared byte by byte with 
unsigned comparison. Otherwise, this
+ * is not the collection for you.
+ */
+public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
+{
+  private static final int MAX_LOG_BUFFER_SIZE = 26;
+  private final SegmentWriteOutMedium segmentWriteOutMedium;
+  private final int bucketSize;
+  private final ByteOrder byteOrder;
+  private final byte[][] bucketBuffer;
+  private final ByteBuffer getOffsetBuffer;
+  private final int div;
+
+  @Nullable
+  private byte[] prevObject = null;
+  @Nullable
+  private WriteOutBytes headerOut = null;
+  @Nullable
+  private WriteOutBytes valuesOut = null;
+  private int numWritten = 0;
+  private ByteBuffer scratch;
+  private int logScratchSize = 10;
+  private boolean isClosed = false;
+  private boolean hasNulls = false;
+
+  public FrontCodedIndexedWriter(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ByteOrder byteOrder,
+      int bucketSize
+  )
+  {
+    this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.scratch = ByteBuffer.allocate(1 << logScratchSize).order(byteOrder);
+    this.bucketSize = bucketSize;
+    this.byteOrder = byteOrder;
+    this.bucketBuffer = new byte[bucketSize][];
+    this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
+    this.div = Integer.numberOfTrailingZeros(bucketSize);
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    headerOut = segmentWriteOutMedium.makeWriteOutBytes();
+    valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
+  }
+
+  @Override
+  public void write(@Nullable byte[] value) throws IOException
+  {
+    if (prevObject != null && unsignedCompare(prevObject, value) >= 0) {
+      throw new ISE(
+          "Values must be sorted and unique. Element [%s] with value [%s] is 
before or equivalent to [%s]",
+          numWritten,
+          value == null ? null : StringUtils.fromUtf8(value),
+          StringUtils.fromUtf8(prevObject)
+      );
+    }
+
+    if (value == null) {
+      hasNulls = true;
+      return;
+    }
+
+    // if the bucket buffer is full, write the bucket
+    if (numWritten > 0 && (numWritten % bucketSize) == 0) {
+      resetScratch();
+      int written;
+      // write the bucket, growing scratch buffer as necessary
+      do {
+        written = writeBucket(scratch, bucketBuffer, bucketSize);
+        if (written < 0) {
+          growScratch();
+        }
+      } while (written < 0);
+      scratch.flip();
+      Channels.writeFully(valuesOut, scratch);
+
+      resetScratch();
+      // write end offset for current value
+      scratch.putInt((int) valuesOut.size());
+      scratch.flip();
+      Channels.writeFully(headerOut, scratch);
+    }
+
+    bucketBuffer[numWritten % bucketSize] = value;
+
+    ++numWritten;
+    prevObject = value;
+  }
+
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    if (!isClosed) {
+      flush();
+    }
+    int headerAndValues = Ints.checkedCast(headerOut.size() + 
valuesOut.size());
+    return Byte.BYTES +
+           Byte.BYTES +
+           Byte.BYTES +
+           VByte.computeIntSize(numWritten) +
+           VByte.computeIntSize(headerAndValues) +
+           headerAndValues;
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
+  {
+    if (!isClosed) {
+      flush();
+    }
+    resetScratch();
+    // version 0
+    scratch.put((byte) 0);
+    scratch.put((byte) bucketSize);
+    scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE : 
NullHandling.IS_NOT_NULL_BYTE);
+    VByte.writeInt(scratch, numWritten);
+    VByte.writeInt(scratch, Ints.checkedCast(headerOut.size() + 
valuesOut.size()));
+    scratch.flip();
+    Channels.writeFully(channel, scratch);
+    headerOut.writeTo(channel);
+    valuesOut.writeTo(channel);
+  }
+
+  @Override
+  public boolean isSorted()
+  {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public byte[] get(int index) throws IOException

Review Comment:
   One wonders what a `get` is doing in a writer...



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return () -> new FrontCodedIndexed(
+        orderedBuffer,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        offsetsPosition,
+        offsetsPosition + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,

Review Comment:
   Isn't this value directly determined by `hasNull`? Is this redundant value 
needed or can it be computed?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return () -> new FrontCodedIndexed(

Review Comment:
   Suggestion: rather than making `FrontCodedIndex` dumb, and this `Supplier` 
smart, does it make sense to pass in the parameters into the constructor and 
let it do the math? Else, we'd have to check all the places that the ctor is 
used to ensure that they did the same math. (And, if it is used only once, the 
ctor is trusting an external source to do its math for it...)
   
   Maybe there are two sources: write and read? If so, can the common stuff be 
in the ctor? Or, provide two ctors, one for each case?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return () -> new FrontCodedIndexed(
+        orderedBuffer,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        offsetsPosition,
+        offsetsPosition + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer.asReadOnlyBuffer().order(buffer.order());
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    if (hasNull && index == 0) {
+      return null;
+    }
+
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    final int adjustedIndex = index - adjustIndex;
+    // find the bucket which contains the value with maths
+    final int bucket = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    final int offset = getBucketOffset(bucket);
+    buffer.position(offset);
+    return getFromBucket(buffer, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currentBucket = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currentBucket * bucketSize;
+
+      // compare against first value in "current" bucket
+      final int offset = getBucketOffset(currentBucket);
+      buffer.position(offset);
+      final int firstLength = VByte.readInt(buffer);
+      final int firstOffset = buffer.position();
+      int comparison = compareBucketFirstValue(buffer, firstLength, value);
+      // save the length of the shared prefix with the first value of the 
bucket and the value to match so we
+      // can use it later to skip over all values in the bucket that share a 
longer prefix with the first value
+      // (the bucket is sorted, so the prefix length gets smaller as values 
increase)
+      final int sharedPrefix = buffer.position() - firstOffset;
+      if (comparison == 0) {
+        if (firstLength == value.remaining()) {
+          // it turns out that the first value in current bucket is what we 
are looking for, short circuit
+          return currBucketFirstValueIndex + adjustIndex;
+        } else {
+          comparison = Integer.compare(firstLength, value.remaining());
+        }
+      }
+
+      // we also compare against the adjacent bucket to determine if the value 
is actually in this bucket or
+      // if we need to keep searching buckets
+      final int nextOffset = getBucketOffset(currentBucket + 1);

Review Comment:
   Check if we're already on the last bucket to prevent read-past-end-buffer?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such 
as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta 
encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written 
entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of 
the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder 
ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    final int numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    final int adjustIndex = hasNull ? 1 : 0;
+    final int div = Integer.numberOfTrailingZeros(bucketSize);
+    final int rem = bucketSize - 1;
+    return () -> new FrontCodedIndexed(
+        orderedBuffer,
+        bucketSize,
+        numBuckets,
+        (numValues & rem) == 0 ? bucketSize : numValues & rem,
+        hasNull,
+        numValues + adjustIndex,
+        adjustIndex,
+        div,
+        rem,
+        offsetsPosition,
+        offsetsPosition + ((numBuckets - 1) * Integer.BYTES)
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      int bucketSize,
+      int numBuckets,
+      int lastBucketNumValues,
+      boolean hasNull,
+      int adjustedNumValues,
+      int adjustIndex,
+      int div,
+      int rem,
+      int offsetsPosition,
+      int bucketsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer.asReadOnlyBuffer().order(buffer.order());
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+    this.adjustedNumValues = adjustedNumValues;
+    this.adjustIndex = adjustIndex;
+    this.div = div;
+    this.rem = rem;
+    this.numBuckets = numBuckets;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = bucketsPosition;
+    this.lastBucketNumValues = lastBucketNumValues;
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    if (hasNull && index == 0) {
+      return null;
+    }
+
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index
+    final int adjustedIndex = index - adjustIndex;
+    // find the bucket which contains the value with maths
+    final int bucket = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    final int offset = getBucketOffset(bucket);
+    buffer.position(offset);
+    return getFromBucket(buffer, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate 
the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currentBucket = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currentBucket * bucketSize;
+
+      // compare against first value in "current" bucket
+      final int offset = getBucketOffset(currentBucket);
+      buffer.position(offset);
+      final int firstLength = VByte.readInt(buffer);
+      final int firstOffset = buffer.position();
+      int comparison = compareBucketFirstValue(buffer, firstLength, value);
+      // save the length of the shared prefix with the first value of the 
bucket and the value to match so we
+      // can use it later to skip over all values in the bucket that share a 
longer prefix with the first value
+      // (the bucket is sorted, so the prefix length gets smaller as values 
increase)
+      final int sharedPrefix = buffer.position() - firstOffset;
+      if (comparison == 0) {
+        if (firstLength == value.remaining()) {
+          // it turns out that the first value in current bucket is what we 
are looking for, short circuit
+          return currBucketFirstValueIndex + adjustIndex;
+        } else {
+          comparison = Integer.compare(firstLength, value.remaining());
+        }
+      }
+
+      // we also compare against the adjacent bucket to determine if the value 
is actually in this bucket or
+      // if we need to keep searching buckets
+      final int nextOffset = getBucketOffset(currentBucket + 1);
+      buffer.position(nextOffset);
+      final int nextLength = VByte.readInt(buffer);
+      int comparisonNext = compareBucketFirstValue(buffer, nextLength, value);
+      if (comparisonNext == 0) {
+        if (nextLength == value.remaining()) {
+          // it turns out that the first value in next bucket is what we are 
looking for, go ahead and short circuit
+          // for that as well, even though we weren't going to scan that 
bucket on this iteration...
+          return (currBucketFirstValueIndex + adjustIndex) + bucketSize;
+        } else {
+          comparisonNext = Integer.compare(nextLength, value.remaining());
+        }
+      }
+
+      if (comparison < 0 && comparisonNext > 0) {
+        // this is exactly the right bucket
+        // find the value in the bucket (or where it would be if it were 
present)
+        buffer.position(firstOffset + firstLength);
+
+        return findValueInBucket(value, currBucketFirstValueIndex, bucketSize, 
sharedPrefix);
+      } else if (comparison < 0) {
+        minBucketIndex = currentBucket + 1;
+      } else {
+        maxBucketIndex = currentBucket - 1;
+      }
+    }
+
+    // this is where we ended up, try to find the value in the bucket
+    final int bucketIndexBase = minBucketIndex * bucketSize;
+    final int numValuesInBucket;
+    if (minBucketIndex == numBuckets - 1) {
+      numValuesInBucket = lastBucketNumValues;
+    } else {
+      numValuesInBucket = bucketSize;
+    }
+    final int offset = getBucketOffset(minBucketIndex);
+
+    // like we did in the loop, except comparison being smaller the first 
value here is a short circuit
+    buffer.position(offset);
+    final int firstLength = VByte.readInt(buffer);
+    final int firstOffset = buffer.position();
+    int comparison = compareBucketFirstValue(buffer, firstLength, value);
+    final int sharedPrefix = buffer.position() - firstOffset;
+    if (comparison == 0) {
+      if (firstLength == value.remaining()) {
+        // it turns out that the first value in current bucket is what we are 
looking for, short circuit
+        return bucketIndexBase + adjustIndex;
+      } else {
+        comparison = Integer.compare(firstLength, value.remaining());
+      }
+    }
+
+    if (comparison > 0) {
+      // value preceedes bucket, so bail out
+      return -(bucketIndexBase + adjustIndex) - 1;
+    }
+
+    buffer.position(firstOffset + firstLength);
+
+    return findValueInBucket(value, bucketIndexBase, numValuesInBucket, 
sharedPrefix);
+  }
+
+  @Override
+  public Iterator<ByteBuffer> iterator()
+  {
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? 
bucketSize : lastBucketNumValues);
+    // iterator decodes and buffers a bucket at a time, paging through buckets 
as the iterator is consumed
+    return new Iterator<ByteBuffer>()
+    {
+      private int currIndex = 0;
+      private int currentBucketIndex = 0;
+      private ByteBuffer[] currentBucket = firstBucket;
+
+      @Override
+      public boolean hasNext()
+      {
+        return currIndex < adjustedNumValues;
+      }
+
+      @Override
+      public ByteBuffer next()
+      {
+        // null is handled special
+        if (hasNull && currIndex == 0) {
+          currIndex++;
+          return null;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        final int adjustedCurrIndex = hasNull ? currIndex - 1 : currIndex;
+        final int bucketNum = adjustedCurrIndex >> div;
+        // load next bucket if needed
+        if (bucketNum != currentBucketIndex) {
+          final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * 
Integer.BYTES));
+          copy.position(bucketsPosition + offset);
+          currentBucket = readBucket(
+              copy,
+              bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
+          );
+          currentBucketIndex = bucketNum;
+        }
+        int offset = adjustedCurrIndex & rem;
+        currIndex++;
+        return currentBucket[offset];
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("buffer", buffer);
+    inspector.visit("hasNulls", hasNull);
+    inspector.visit("bucketSize", bucketSize);
+  }
+
+  private int getBucketOffset(int bucket)
+  {
+    // get offset of that bucket in the value buffer, subtract 1 to get the 
starting position because we only store the
+    // ending offset, so look at the ending offset of the previous bucket, or 
0 if this is the first bucket
+    return bucketsPosition + (bucket > 0 ? buffer.getInt(offsetsPosition + 
((bucket - 1) * Integer.BYTES)) : 0);
+  }
+
+
+  /**
+   * Performs an unsigned byte comparison of the first value in a bucket with 
the specified value. Note that this method
+   * MUST be prepared before calling, as it expects the length of the first 
value to have already been read externally,
+   * and the buffer position to be at the start of the first bucket value. The 
final buffer position will be the
+   * 'shared prefix length' of the first value in the bucket and the value to 
compare
+   */
+  private static int compareBucketFirstValue(ByteBuffer bucketBuffer, int 
length, ByteBuffer value)
+  {
+    final int startOffset = bucketBuffer.position();
+    final int commonLength = Math.min(length, value.remaining());
+    // save the length of the shared prefix with the first value of the bucket 
and the value to match so we
+    // can use it later to skip over all values in the bucket that share a 
longer prefix with the first value
+    // (the bucket is sorted, so the prefix length gets smaller as values 
increase)
+    int sharedPrefix;
+    int comparison = 0;
+    for (sharedPrefix = 0; sharedPrefix < commonLength; sharedPrefix++) {
+      comparison = unsignedByteCompare(bucketBuffer.get(), 
value.get(sharedPrefix));
+      if (comparison != 0) {
+        bucketBuffer.position(startOffset + sharedPrefix);
+        break;
+      }
+    }
+    return comparison;
+  }
+
+  /**
+   * Finds a value in a bucket among the fragments. The first value is assumed 
to have been already compared against
+   * and be smaller than the value we are looking for. This comparison is the 
source of the 'shared prefix', which is
+   * the length which the value has in common with the first value of the 
bucket.
+   *
+   * This method uses this shared prefix length to skip more expensive byte by 
byte full value comparisons when
+   * possible by comparing the shared prefix length with the prefix length of 
the fragment. Since the bucket is always
+   * sorted, prefix lengths shrink as you progress to higher indexes, and we 
can use this to reason that a fragment
+   * with a longer prefix length than the shared prefix will always sort 
before the value we are looking for, and values
+   * which have a shorter prefix will always be greater than the value we are 
looking for, so we only need to do a
+   * full comparison if the prefix length is the same
+   *
+   * this method modifies the position of {@link #buffer}
+   */
+  private int findValueInBucket(
+      ByteBuffer value,
+      int currBucketFirstValueIndex,
+      int bucketSize,
+      int sharedPrefix
+  )
+  {
+    int relativePosition = 0;
+    int prefixLength;
+    // scan through bucket values until we find match or compare numValues
+    int insertionPoint = 1;
+    while (++relativePosition < bucketSize) {
+      prefixLength = VByte.readInt(buffer);
+      if (prefixLength > sharedPrefix) {
+        final int skip = VByte.readInt(buffer);
+        buffer.position(buffer.position() + skip);
+        insertionPoint++;
+      } else if (prefixLength < sharedPrefix) {
+        // prefix is smaller, that means this value sorts ahead of it
+        break;
+      } else {
+        final int fragmentLength = VByte.readInt(buffer);
+        final int common = Math.min(fragmentLength, value.remaining() - 
prefixLength);
+        int fragmentComparison = 0;
+        for (int i = 0; i < common; i++) {
+          fragmentComparison = 
unsignedByteCompare(buffer.get(buffer.position() + i), value.get(prefixLength + 
i));
+          if (fragmentComparison != 0) {
+            break;
+          }
+        }
+        if (fragmentComparison == 0) {
+          fragmentComparison = Integer.compare(prefixLength + fragmentLength, 
value.remaining());
+        }
+
+        if (fragmentComparison == 0) {
+          return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+        } else if (fragmentComparison < 0) {
+          buffer.position(buffer.position() + fragmentLength);
+          insertionPoint++;
+        } else {
+          break;
+        }
+      }
+    }
+    // (-(insertion point) - 1)
+    return -(currBucketFirstValueIndex + adjustIndex) + (-(insertionPoint) - 
1);
+  }
+
+  /**
+   * Get a value from a bucket at a relative position.
+   *
+   * This method modifies the position of the buffer.
+   */
+  static ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+  {
+    int prefixPosition;
+    if (offset == 0) {
+      final int length = VByte.readInt(buffer);
+      final ByteBuffer firstValue = 
buffer.asReadOnlyBuffer().order(buffer.order());
+      firstValue.limit(firstValue.position() + length);
+      return firstValue;
+    } else {
+      final int firstLength = VByte.readInt(buffer);
+      prefixPosition = buffer.position();
+      buffer.position(buffer.position() + firstLength);
+    }
+    int pos = 0;
+    int prefixLength;
+    int fragmentLength;
+    int fragmentPosition;
+    // scan through bucket values until we reach offset
+    do {
+      prefixLength = VByte.readInt(buffer);
+      if (++pos < offset) {
+        // not there yet, no need to read anything other than the length to 
skip ahead
+        final int skipLength = VByte.readInt(buffer);
+        buffer.position(buffer.position() + skipLength);
+      } else {
+        // we've reached our destination
+        fragmentLength = VByte.readInt(buffer);
+        fragmentPosition = buffer.position();
+        break;
+      }
+    } while (true);
+    final int valueLength = prefixLength + fragmentLength;
+    ByteBuffer value = ByteBuffer.allocate(valueLength);
+    for (int i = 0; i < valueLength; i++) {
+      if (i < prefixLength) {
+        value.put(buffer.get(prefixPosition + i));
+      } else {
+        value.put(buffer.get(fragmentPosition + i - prefixLength));
+      }
+    }
+    value.flip();
+    return value;
+  }
+
+
+  /**
+   * Read an entire bucket from a {@link ByteBuffer}, returning an array of 
reconstructed value bytes.
+   *
+   * This method modifies the position of the buffer.
+   */
+  private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues)

Review Comment:
   Nit picky: do we need a `ByteBuffer` for each value, or just a `byte[]`? 
That is, do we then do anything fancy with the values or just treat them as a 
block of bytes? Using an array halves the garbage that this method creates.
   
   Or, if the primary use of the values is as `String`s, should this return a 
`String` to avoid a conversion later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to