clintropolis commented on a change in pull request #6016: Druid 'Shapeshifting' 
Columns
URL: https://github.com/apache/incubator-druid/pull/6016#discussion_r207370256
 
 

 ##########
 File path: 
processing/src/main/java/io/druid/segment/data/ShapeShiftingColumnSerializer.java
 ##########
 @@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import io.druid.java.util.common.io.smoosh.FileSmoosher;
+import io.druid.java.util.common.logger.Logger;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.data.codecs.CompressedFormEncoder;
+import io.druid.segment.data.codecs.FormEncoder;
+import io.druid.segment.data.codecs.FormMetrics;
+import io.druid.segment.serde.Serializer;
+import io.druid.segment.writeout.SegmentWriteOutMedium;
+import io.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Base serializer for {@link ShapeShiftingColumn} implementations, providing 
most common functionality such as headers,
+ * value-chunking, encoder selection, and writing out values.
+ *
+ * Encoding Selection:
+ * The intention of this base structure is that implementors of this class 
will analyze incoming values and aggregate
+ * facts about the data which matching {@link FormEncoder} implementations 
might find interesting, while storing raw,
+ * unencoded values in {@link ShapeShiftingColumnSerializer#currentChunk}. 
When the threshold of
+ * {@link ShapeShiftingColumnSerializer#valuesPerChunk} is reached, {@link 
ShapeShiftingColumnSerializer} will attempt
+ * to find the "best" encoding by first computing the encoded size with
+ * {@link FormEncoder#getEncodedSize} and then applying a modifier to scale 
this value in order to influence behavior
+ * when sizes are relatively close according to the chosen {@link 
IndexSpec.ShapeShiftOptimizationTarget}. This
+ * effectively sways the decision towards using encodings with faster decoding 
speed or smaller encoded size as
+ * appropriate. Note that very often the best encoding is unambiguous and 
these settings don't matter, the nuanced
+ * differences of behavior of {@link IndexSpec.ShapeShiftOptimizationTarget} 
mainly come into play when things are
+ * close.
+ *
+ * Implementors need only supply an initialize method to allocate storage for 
{@code <TChunk>}, an add value method to
+ * populate {@code <TChunk>}, a reset method to prepare {@code 
<TChunkMetrics>} for the next chunk after a flush, and
+ * of course, matching {@link FormEncoder} implementations to perform actual 
value encoding. Generic compression is
+ * available to {@link FormEncoder} implementations by implementing
+ * {@link io.druid.segment.data.codecs.CompressibleFormEncoder} and wrapping 
in a
+ * {@link io.druid.segment.data.codecs.CompressedFormEncoder} in the codec 
list passed to the serializer.
+ *
+ * layout:
+ * | version (byte) | headerSize (int) | numValues (int) | numChunks (int) | 
logValuesPerChunk (byte) | offsetsOutSize (int) |  compositionSize (int) | 
composition | offsets | values |
+ *
+ * @param <TChunk>
+ * @param <TChunkMetrics>
+ */
+public abstract class ShapeShiftingColumnSerializer<TChunk, TChunkMetrics 
extends FormMetrics> implements Serializer
+{
+  /**
+   * | version (byte) | headerSize (int) | numValues (int) | numChunks (int) | 
logValuesPerChunk (byte) | offsetsOutSize (int) | compositionSize (int) |
+   */
+  private static final int BASE_HEADER_BYTES = 1 + (3 * Integer.BYTES) + 1 + 
(2 * Integer.BYTES);
+
+  private static Logger log = new Logger(ShapeShiftingColumnSerializer.class);
+
+  protected final SegmentWriteOutMedium segmentWriteOutMedium;
+  protected final FormEncoder<TChunk, TChunkMetrics>[] codecs;
+  protected final byte version;
+  protected final byte logValuesPerChunk;
+  protected final int valuesPerChunk;
+  protected final ByteBuffer intToBytesHelperBuffer;
+  protected final Map<FormEncoder, Integer> composition;
+  protected final IndexSpec.ShapeShiftOptimizationTarget optimizationTarget;
+  protected WriteOutBytes offsetsOut;
+  protected WriteOutBytes valuesOut;
+  protected boolean wroteFinalOffset = false;
+  protected TChunkMetrics chunkMetrics;
+  protected TChunk currentChunk;
+  protected int currentChunkPos = 0;
+  protected int numChunks = 0;
+  protected int numValues = 0;
+
+  public ShapeShiftingColumnSerializer(
+      final SegmentWriteOutMedium segmentWriteOutMedium,
+      final FormEncoder<TChunk, TChunkMetrics>[] codecs,
+      final IndexSpec.ShapeShiftOptimizationTarget optimizationTarget,
+      final IndexSpec.ShapeShiftBlockSize blockSize,
+      final int logBytesPerValue,
+      final byte version,
+      @Nullable final ByteOrder overrideByteOrder,
+      @Nullable final Byte overrideLogValuesPerChunk
+  )
+  {
+    Preconditions.checkArgument(codecs.length > 0, "must have at least one 
encoder");
+    this.segmentWriteOutMedium = 
Preconditions.checkNotNull(segmentWriteOutMedium, "segmentWriteOutMedium");
+    this.version = version;
+    this.logValuesPerChunk = overrideLogValuesPerChunk != null
+                             ? overrideLogValuesPerChunk
+                             : (byte) (blockSize.getLogBlockSize() - 
logBytesPerValue);
+    this.valuesPerChunk = 1 << logValuesPerChunk;
+    this.codecs = codecs;
+    this.optimizationTarget = optimizationTarget;
+    ByteOrder byteOrder = overrideByteOrder == null ? ByteOrder.nativeOrder() 
: overrideByteOrder;
+    this.intToBytesHelperBuffer = 
ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
+    this.composition = Maps.newHashMap();
+  }
+
+  public void open() throws IOException
+  {
+    offsetsOut = segmentWriteOutMedium.makeWriteOutBytes();
+    valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
+    initializeChunk();
+    resetChunkCollector();
+  }
+
+  /**
+   * Initialize/allocate {@link ShapeShiftingColumnSerializer#currentChunk} to 
hold unencoded chunk values until
+   * {@link ShapeShiftingColumnSerializer#flushCurrentChunk()} is performed.
+   */
+  public abstract void initializeChunk();
+
+  /**
+   * Reset {@link ShapeShiftingColumnSerializer#chunkMetrics} to prepare for 
analyzing the next incoming chunk of data
+   * after performing {@link ShapeShiftingColumnSerializer#flushCurrentChunk()}
+   */
+  public abstract void resetChunkCollector();
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    if (currentChunkPos > 0) {
+      flushCurrentChunk();
+    }
+
+    writeFinalOffset();
+
+    return getHeaderSize() + offsetsOut.size() + valuesOut.size();
+  }
+
+  @Override
+  public void writeTo(
+      final WritableByteChannel channel,
+      final FileSmoosher smoosher
+  ) throws IOException
+  {
+    if (currentChunkPos > 0) {
+      flushCurrentChunk();
+    }
+
+    writeFinalOffset();
+
+    writeShapeShiftHeader(
+        channel,
+        intToBytesHelperBuffer,
+        version,
+        numChunks,
+        numValues,
+        logValuesPerChunk,
+        composition,
+        Ints.checkedCast(offsetsOut.size())
+    );
+    offsetsOut.writeTo(channel);
+    valuesOut.writeTo(channel);
+  }
+
+
+  protected ByteBuffer toBytes(final int n)
+  {
+    intToBytesHelperBuffer.putInt(0, n);
+    intToBytesHelperBuffer.rewind();
+    return intToBytesHelperBuffer;
+  }
+
+  /**
+   * Encode values of {@link ShapeShiftingColumnSerializer#currentChunk} with 
the 'best' available {@link FormEncoder}
+   * given the information collected in {@link 
ShapeShiftingColumnSerializer#chunkMetrics}. The best is chosen by
+   * computing the smallest 'modified' size, where {@link 
FormEncoder#getSpeedModifier(FormMetrics)}} is tuned based
+   * on decoding speed for each encoding in relation to all other available 
encodings.
+   *
+   * @throws IOException
+   */
+  void flushCurrentChunk() throws IOException
+  {
+    Preconditions.checkState(!wroteFinalOffset, "!wroteFinalOffset");
+    Preconditions.checkState(currentChunkPos > 0, "currentChunkPos > 0");
+    Preconditions.checkState(offsetsOut.isOpen(), "offsetsOut.isOpen");
+    Preconditions.checkState(valuesOut.isOpen(), "valuesOut.isOpen");
+
+    offsetsOut.write(toBytes(Ints.checkedCast(valuesOut.size())));
+
+    int bestSize = Integer.MAX_VALUE;
+    FormEncoder<TChunk, TChunkMetrics> bestCodec = null;
+    if (codecs.length > 1) {
+      for (FormEncoder<TChunk, TChunkMetrics> codec : codecs) {
+        int theSize = codec.getEncodedSize(currentChunk, currentChunkPos, 
chunkMetrics);
+        if (theSize < bestSize) {
+          double modified = (double) theSize * 
codec.getSpeedModifier(chunkMetrics);
+          if (modified < bestSize) {
+            bestCodec = codec;
+            bestSize = (int) modified;
+          }
+        }
+      }
+    } else {
+      bestCodec = codecs[0];
+    }
+
+
+    if (bestCodec == null) {
+      throw new RuntimeException("WTF? Unable to select an encoder.");
+    }
+
+    if (!composition.containsKey(bestCodec)) {
+      composition.put(bestCodec, 0);
+    }
+    composition.computeIfPresent(bestCodec, (k, v) -> v + 1);
+    if (bestCodec instanceof CompressedFormEncoder) {
+      FormEncoder inner = ((CompressedFormEncoder) 
bestCodec).getInnerEncoder();
+      if (!composition.containsKey(inner)) {
+        composition.put(inner, 0);
+      }
+      composition.computeIfPresent(inner, (k, v) -> v + 1);
+    }
+    valuesOut.write(new byte[]{bestCodec.getHeader()});
+    bestCodec.encode(valuesOut, currentChunk, currentChunkPos, chunkMetrics);
+
+    numChunks++;
+    resetChunk();
+  }
+
+  private void resetChunk()
+  {
+    currentChunkPos = 0;
+    resetChunkCollector();
+  }
+
+  private void writeFinalOffset() throws IOException
+  {
+    if (!wroteFinalOffset) {
+      offsetsOut.write(toBytes(Ints.checkedCast(valuesOut.size())));
+      wroteFinalOffset = true;
+    }
+  }
+
+  private int getHeaderSize()
+  {
+    return BASE_HEADER_BYTES + (composition.size() * 5);
+  }
+
+  static void writeShapeShiftHeader(
 
 Review comment:
   Static by accident, updated as part of slight header refactor

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to