This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3711c0d987 Reduce heap footprint of GenericIndexed. (#14563)
3711c0d987 is described below

commit 3711c0d987213a0086053916a23775768f6f0a6b
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jul 12 08:11:41 2023 -0700

    Reduce heap footprint of GenericIndexed. (#14563)
    
    Two changes:
    
    1) Intern DecompressingByteBufferObjectStrategy. Saves ~32 bytes per column.
    
    2) Split GenericIndexed into GenericIndexed.V1 and GenericIndexed.V2. The
       major benefit here is isolating out the ByteBuffers that are only needed
       for V2. This saves ~80 bytes for V1 (one buffer instead of two).
---
 .../data/BlockLayoutColumnarDoublesSupplier.java   |   2 +-
 .../data/BlockLayoutColumnarFloatsSupplier.java    |   2 +-
 .../data/BlockLayoutColumnarLongsSupplier.java     |   2 +-
 .../data/CompressedColumnarIntsSupplier.java       |   4 +-
 .../data/CompressedVSizeColumnarIntsSupplier.java  |   4 +-
 .../DecompressingByteBufferObjectStrategy.java     |  23 +-
 .../apache/druid/segment/data/GenericIndexed.java  | 493 ++++++++++-----------
 7 files changed, 261 insertions(+), 269 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index 98a7ab51f9..e3699ea3db 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -45,7 +45,7 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
       CompressionStrategy strategy
   )
   {
-    baseDoubleBuffers = GenericIndexed.read(fromBuffer, new 
DecompressingByteBufferObjectStrategy(byteOrder, strategy));
+    baseDoubleBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
     this.totalSize = totalSize;
     this.sizePer = sizePer;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
index 26d7c798c7..383a99b3f4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
@@ -45,7 +45,7 @@ public class BlockLayoutColumnarFloatsSupplier implements 
Supplier<ColumnarFloat
       CompressionStrategy strategy
   )
   {
-    baseFloatBuffers = GenericIndexed.read(fromBuffer, new 
DecompressingByteBufferObjectStrategy(byteOrder, strategy));
+    baseFloatBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
     this.totalSize = totalSize;
     this.sizePer = sizePer;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 29a0748bce..6fe04fbd31 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -47,7 +47,7 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
       CompressionStrategy strategy
   )
   {
-    baseLongBuffers = GenericIndexed.read(fromBuffer, new 
DecompressingByteBufferObjectStrategy(order, strategy));
+    baseLongBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(order, strategy));
     this.totalSize = totalSize;
     this.sizePer = sizePer;
     this.baseReader = reader;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
index e685721eb8..88e78f7980 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
@@ -125,7 +125,7 @@ public class CompressedColumnarIntsSupplier implements 
WritableSupplier<Columnar
       return new CompressedColumnarIntsSupplier(
           totalSize,
           sizePer,
-          GenericIndexed.read(buffer, new 
DecompressingByteBufferObjectStrategy(order, compression)),
+          GenericIndexed.read(buffer, 
DecompressingByteBufferObjectStrategy.of(order, compression)),
           compression
       );
     }
@@ -148,7 +148,7 @@ public class CompressedColumnarIntsSupplier implements 
WritableSupplier<Columnar
       return new CompressedColumnarIntsSupplier(
           totalSize,
           sizePer,
-          GenericIndexed.read(buffer, new 
DecompressingByteBufferObjectStrategy(order, compression), mapper),
+          GenericIndexed.read(buffer, 
DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
           compression
       );
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index e63d6702f8..503d4f65fe 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -158,7 +158,7 @@ public class CompressedVSizeColumnarIntsSupplier implements 
WritableSupplier<Col
           totalSize,
           sizePer,
           numBytes,
-          GenericIndexed.read(buffer, new 
DecompressingByteBufferObjectStrategy(order, compression)),
+          GenericIndexed.read(buffer, 
DecompressingByteBufferObjectStrategy.of(order, compression)),
           compression
       );
 
@@ -186,7 +186,7 @@ public class CompressedVSizeColumnarIntsSupplier implements 
WritableSupplier<Col
           totalSize,
           sizePer,
           numBytes,
-          GenericIndexed.read(buffer, new 
DecompressingByteBufferObjectStrategy(order, compression), mapper),
+          GenericIndexed.read(buffer, 
DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
           compression
       );
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
 
b/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
index 9c77d884b7..72db63b1a1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
@@ -20,22 +20,43 @@
 package org.apache.druid.segment.data;
 
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.segment.CompressedPools;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class DecompressingByteBufferObjectStrategy implements 
ObjectStrategy<ResourceHolder<ByteBuffer>>
 {
+  /**
+   * Cache strategies in a static, because there are not very many distinct 
ones -- there are only so many combinations
+   * of byte order and decompressor that we can possibly have -- and we need 
one of these per GenericIndexed, which
+   * is a class that we tend to have tons of in heap.
+   */
+  private static final ConcurrentHashMap<Pair<ByteOrder, CompressionStrategy>, 
DecompressingByteBufferObjectStrategy> STRATEGIES =
+      new ConcurrentHashMap<>();
+
   private final ByteOrder order;
   private final CompressionStrategy.Decompressor decompressor;
 
-  DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy 
compression)
+  private DecompressingByteBufferObjectStrategy(ByteOrder order, 
CompressionStrategy compression)
   {
     this.order = order;
     this.decompressor = compression.getDecompressor();
   }
 
+  public static DecompressingByteBufferObjectStrategy of(
+      final ByteOrder order,
+      final CompressionStrategy compression
+  )
+  {
+    return STRATEGIES.computeIfAbsent(
+        Pair.of(order, compression),
+        pair -> new DecompressingByteBufferObjectStrategy(pair.lhs, pair.rhs)
+    );
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public Class<ResourceHolder<ByteBuffer>> getClazz()
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 11a8b61531..a87dcda09b 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -82,7 +82,7 @@ import java.util.Iterator;
  *
  * @see GenericIndexedWriter
  */
-public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
+public abstract class GenericIndexed<T> implements CloseableIndexed<T>, 
Serializer
 {
   static final byte VERSION_ONE = 0x1;
   static final byte VERSION_TWO = 0x2;
@@ -91,12 +91,6 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
 
   static final int NULL_VALUE_SIZE_MARKER = -1;
 
-  private static final MetaSerdeHelper<GenericIndexed> META_SERDE_HELPER = 
MetaSerdeHelper
-      .firstWriteByte((GenericIndexed x) -> VERSION_ONE)
-      .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : 
REVERSE_LOOKUP_DISALLOWED)
-      .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) 
Integer.BYTES))
-      .writeInt(x -> x.size);
-
   private static final SerializerUtils SERIALIZER_UTILS = new 
SerializerUtils();
 
   /**
@@ -220,7 +214,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
         buffers,
         
GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression, 
bufferSize, closer),
         false,
-        new DecompressingByteBufferObjectStrategy(order, compression)
+        DecompressingByteBufferObjectStrategy.of(order, compression)
     );
   }
 
@@ -238,75 +232,243 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     return numberOfFilesRequired;
   }
 
+  protected final ObjectStrategy<T> strategy;
+  protected final boolean allowReverseLookup;
+  protected final int size;
 
-  private final boolean versionOne;
+  public GenericIndexed(
+      final ObjectStrategy<T> strategy,
+      final boolean allowReverseLookup,
+      final int size
+  )
+  {
+    this.strategy = strategy;
+    this.allowReverseLookup = allowReverseLookup;
+    this.size = size;
+  }
 
-  private final ObjectStrategy<T> strategy;
-  private final boolean allowReverseLookup;
-  private final int size;
-  private final ByteBuffer headerBuffer;
+  public abstract BufferIndexed singleThreaded();
 
-  private final ByteBuffer firstValueBuffer;
+  @Override
+  public abstract long getSerializedSize();
+
+  private static final class V1<T> extends GenericIndexed<T>
+  {
+    @SuppressWarnings("rawtypes")
+    private static final MetaSerdeHelper<GenericIndexed.V1> META_SERDE_HELPER 
= MetaSerdeHelper
+        .firstWriteByte((GenericIndexed.V1 x) -> VERSION_ONE)
+        .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : 
REVERSE_LOOKUP_DISALLOWED)
+        .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) 
Integer.BYTES))
+        .writeInt(x -> x.size);
+
+    private final ByteBuffer theBuffer;
+    private final int headerOffset;
+    private final int valuesOffset;
+
+    V1(
+        final ByteBuffer buffer,
+        final ObjectStrategy<T> strategy,
+        final boolean allowReverseLookup
+    )
+    {
+      super(strategy, allowReverseLookup, buffer.getInt());
+      this.theBuffer = buffer;
+      this.headerOffset = theBuffer.position();
+      this.valuesOffset = theBuffer.position() + size * Integer.BYTES;
+    }
 
-  private final ByteBuffer[] valueBuffers;
-  private int logBaseTwoOfElementsPerValueFile;
-  private int relativeIndexMask;
+    @Nullable
+    @Override
+    public T get(int index)
+    {
+      checkIndex(index);
 
-  @Nullable
-  private final ByteBuffer theBuffer;
+      final int startOffset;
+      final int endOffset;
 
-  /**
-   * Constructor for version one.
-   */
-  GenericIndexed(
-      ByteBuffer buffer,
-      ObjectStrategy<T> strategy,
-      boolean allowReverseLookup
-  )
-  {
-    this.versionOne = true;
+      if (index == 0) {
+        startOffset = Integer.BYTES;
+        endOffset = theBuffer.getInt(headerOffset);
+      } else {
+        int headerPosition = (index - 1) * Integer.BYTES;
+        startOffset = theBuffer.getInt(headerOffset + headerPosition) + 
Integer.BYTES;
+        endOffset = theBuffer.getInt(headerOffset + headerPosition + 
Integer.BYTES);
+      }
+      return copyBufferAndGet(theBuffer, valuesOffset + startOffset, 
valuesOffset + endOffset);
+    }
 
-    this.theBuffer = buffer;
-    this.strategy = strategy;
-    this.allowReverseLookup = allowReverseLookup;
-    size = theBuffer.getInt();
+    @Override
+    public BufferIndexed singleThreaded()
+    {
+      final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer();
+      return new BufferIndexed()
+      {
+        @Nullable
+        @Override
+        protected ByteBuffer getByteBuffer(final int index)
+        {
+          checkIndex(index);
+
+          final int startOffset;
+          final int endOffset;
+
+          if (index == 0) {
+            startOffset = Integer.BYTES;
+            endOffset = theBuffer.getInt(headerOffset);
+          } else {
+            int headerPosition = (index - 1) * Integer.BYTES;
+            startOffset = theBuffer.getInt(headerOffset + headerPosition) + 
Integer.BYTES;
+            endOffset = theBuffer.getInt(headerOffset + headerPosition + 
Integer.BYTES);
+          }
+          return bufferedIndexedGetByteBuffer(copyBuffer, valuesOffset + 
startOffset, valuesOffset + endOffset);
+        }
 
-    int indexOffset = theBuffer.position();
-    int valuesOffset = theBuffer.position() + size * Integer.BYTES;
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("theBuffer", theBuffer);
+          inspector.visit("copyBuffer", copyBuffer);
+          inspector.visit("strategy", strategy);
+        }
+      };
+    }
 
-    buffer.position(valuesOffset);
-    // Ensure the value buffer's limit equals to capacity.
-    firstValueBuffer = buffer.slice();
-    valueBuffers = new ByteBuffer[]{firstValueBuffer};
-    buffer.position(indexOffset);
-    headerBuffer = buffer.slice();
+    @Override
+    public long getSerializedSize()
+    {
+      return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
+    }
+
+    @Override
+    public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
+    {
+      META_SERDE_HELPER.writeTo(channel, this);
+      Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      inspector.visit("theBuffer", theBuffer);
+      inspector.visit("strategy", strategy);
+    }
   }
 
+  private static final class V2<T> extends GenericIndexed<T>
+  {
+    private final ByteBuffer headerBuffer;
+    private final ByteBuffer[] valueBuffers;
+    private final int logBaseTwoOfElementsPerValueFile;
+    private final int relativeIndexMask;
+
+    private V2(
+        ByteBuffer[] valueBuffs,
+        ByteBuffer headerBuff,
+        ObjectStrategy<T> strategy,
+        boolean allowReverseLookup,
+        int logBaseTwoOfElementsPerValueFile,
+        int numWritten
+    )
+    {
+      super(strategy, allowReverseLookup, numWritten);
+      this.valueBuffers = valueBuffs;
+      this.headerBuffer = headerBuff;
+      this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
+      this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
+      headerBuffer.order(ByteOrder.nativeOrder());
+    }
 
-  /**
-   * Constructor for version two.
-   */
-  GenericIndexed(
-      ByteBuffer[] valueBuffs,
-      ByteBuffer headerBuff,
-      ObjectStrategy<T> strategy,
-      boolean allowReverseLookup,
-      int logBaseTwoOfElementsPerValueFile,
-      int numWritten
-  )
-  {
-    this.versionOne = false;
+    @Nullable
+    @Override
+    public T get(int index)
+    {
+      checkIndex(index);
 
-    this.theBuffer = null;
-    this.strategy = strategy;
-    this.allowReverseLookup = allowReverseLookup;
-    this.valueBuffers = valueBuffs;
-    this.firstValueBuffer = valueBuffers[0];
-    this.headerBuffer = headerBuff;
-    this.size = numWritten;
-    this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
-    this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
-    headerBuffer.order(ByteOrder.nativeOrder());
+      final int startOffset;
+      final int endOffset;
+
+      int relativePositionOfIndex = index & relativeIndexMask;
+      if (relativePositionOfIndex == 0) {
+        int headerPosition = index * Integer.BYTES;
+        startOffset = Integer.BYTES;
+        endOffset = headerBuffer.getInt(headerPosition);
+      } else {
+        int headerPosition = (index - 1) * Integer.BYTES;
+        startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
+        endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
+      }
+      int fileNum = index >> logBaseTwoOfElementsPerValueFile;
+      return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
+    }
+
+    @Override
+    public BufferIndexed singleThreaded()
+    {
+      final ByteBuffer[] copyValueBuffers = new 
ByteBuffer[valueBuffers.length];
+      for (int i = 0; i < valueBuffers.length; i++) {
+        copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
+      }
+
+      return new BufferIndexed()
+      {
+        @Nullable
+        @Override
+        protected ByteBuffer getByteBuffer(int index)
+        {
+          checkIndex(index);
+
+          final int startOffset;
+          final int endOffset;
+
+          int relativePositionOfIndex = index & relativeIndexMask;
+          if (relativePositionOfIndex == 0) {
+            int headerPosition = index * Integer.BYTES;
+            startOffset = 4;
+            endOffset = headerBuffer.getInt(headerPosition);
+          } else {
+            int headerPosition = (index - 1) * Integer.BYTES;
+            startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
+            endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
+          }
+          int fileNum = index >> logBaseTwoOfElementsPerValueFile;
+          return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], 
startOffset, endOffset);
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("headerBuffer", headerBuffer);
+          // Inspecting just one example of copyValueBuffer, not needed to 
inspect the whole array, because all buffers
+          // in it are the same.
+          inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? 
copyValueBuffers[0] : null);
+          inspector.visit("strategy", strategy);
+        }
+      };
+    }
+
+    @Override
+    public long getSerializedSize()
+    {
+      throw new UnsupportedOperationException("Method not supported for 
version 2 GenericIndexed.");
+    }
+
+    @Override
+    public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
+    {
+      throw new UnsupportedOperationException(
+          "GenericIndexed serialization for V2 is unsupported. Use 
GenericIndexedWriter instead.");
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      inspector.visit("headerBuffer", headerBuffer);
+
+      // Inspecting just one example of valueBuffer, not needed to inspect the 
whole array, because all buffers in it
+      // are the same.
+      inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] 
: null);
+      inspector.visit("strategy", strategy);
+    }
   }
 
   /**
@@ -317,7 +479,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
    *
    * @param index index identifying an element of an GenericIndexed.
    */
-  private void checkIndex(int index)
+  protected void checkIndex(int index)
   {
     if (index < 0) {
       throw new IAE("Index[%s] < 0", index);
@@ -338,12 +500,6 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     return size;
   }
 
-  @Override
-  public T get(int index)
-  {
-    return versionOne ? getVersionOne(index) : getVersionTwo(index);
-  }
-
   /**
    * Returns the index of "value" in this GenericIndexed object, or 
(-(insertion point) - 1) if the value is not
    * present, in the manner of Arrays.binarySearch. This strengthens the 
contract of Indexed, which only guarantees
@@ -393,38 +549,8 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     return IndexedIterable.create(this).iterator();
   }
 
-  @Override
-  public long getSerializedSize()
-  {
-    if (!versionOne) {
-      throw new UnsupportedOperationException("Method not supported for 
version 2 GenericIndexed.");
-    }
-    return getSerializedSizeVersionOne();
-  }
-
-  @Override
-  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
-  {
-    if (versionOne) {
-      writeToVersionOne(channel);
-    } else {
-      throw new UnsupportedOperationException(
-          "GenericIndexed serialization for V2 is unsupported. Use 
GenericIndexedWriter instead.");
-    }
-  }
-
-  /**
-   * Create a non-thread-safe Indexed, which may perform better than the 
underlying Indexed.
-   *
-   * @return a non-thread-safe Indexed
-   */
-  public GenericIndexed<T>.BufferIndexed singleThreaded()
-  {
-    return versionOne ? singleThreadedVersionOne() : 
singleThreadedVersionTwo();
-  }
-
   @Nullable
-  private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int 
endOffset)
+  protected T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int 
endOffset)
   {
     ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer();
     int size = endOffset - startOffset;
@@ -439,21 +565,6 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     return strategy.fromByteBuffer(copyValueBuffer, size);
   }
 
-  @Override
-  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-  {
-    inspector.visit("versionOne", versionOne);
-    inspector.visit("headerBuffer", headerBuffer);
-    if (versionOne) {
-      inspector.visit("firstValueBuffer", firstValueBuffer);
-    } else {
-      // Inspecting just one example of valueBuffer, not needed to inspect the 
whole array, because all buffers in it
-      // are the same.
-      inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] 
: null);
-    }
-    inspector.visit("strategy", strategy);
-  }
-
   /**
    * Single-threaded view.
    */
@@ -567,10 +678,6 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     // nothing to close
   }
 
-  ///////////////
-  // VERSION ONE
-  ///////////////
-
   private static <T> GenericIndexed<T> 
createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> 
strategy)
   {
     boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
@@ -579,7 +686,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     bufferToUse.limit(bufferToUse.position() + size);
     byteBuffer.position(bufferToUse.limit());
 
-    return new GenericIndexed<>(
+    return new GenericIndexed.V1<>(
         bufferToUse,
         strategy,
         allowReverseLookup
@@ -597,7 +704,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     if (!objects.hasNext()) {
       final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0);
       buffer.flip();
-      return new GenericIndexed<>(buffer, resultObjectStrategy, 
allowReverseLookup);
+      return new GenericIndexed.V1<>(buffer, resultObjectStrategy, 
allowReverseLookup);
     }
 
     int count = 0;
@@ -642,79 +749,9 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     valuesOut.writeTo(theBuffer);
     theBuffer.flip();
 
-    return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), 
resultObjectStrategy, allowReverseLookup);
-  }
-
-  private long getSerializedSizeVersionOne()
-  {
-    return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
-  }
-
-  @Nullable
-  private T getVersionOne(int index)
-  {
-    checkIndex(index);
-
-    final int startOffset;
-    final int endOffset;
-
-    if (index == 0) {
-      startOffset = Integer.BYTES;
-      endOffset = headerBuffer.getInt(0);
-    } else {
-      int headerPosition = (index - 1) * Integer.BYTES;
-      startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
-      endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
-    }
-    return copyBufferAndGet(firstValueBuffer, startOffset, endOffset);
+    return new GenericIndexed.V1<>(theBuffer.asReadOnlyBuffer(), 
resultObjectStrategy, allowReverseLookup);
   }
 
-  private BufferIndexed singleThreadedVersionOne()
-  {
-    final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer();
-    return new BufferIndexed()
-    {
-      @Nullable
-      @Override
-      protected ByteBuffer getByteBuffer(final int index)
-      {
-        checkIndex(index);
-
-        final int startOffset;
-        final int endOffset;
-
-        if (index == 0) {
-          startOffset = Integer.BYTES;
-          endOffset = headerBuffer.getInt(0);
-        } else {
-          int headerPosition = (index - 1) * Integer.BYTES;
-          startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
-          endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
-        }
-        return bufferedIndexedGetByteBuffer(copyBuffer, startOffset, 
endOffset);
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("headerBuffer", headerBuffer);
-        inspector.visit("copyBuffer", copyBuffer);
-        inspector.visit("strategy", strategy);
-      }
-    };
-  }
-
-  private void writeToVersionOne(WritableByteChannel channel) throws 
IOException
-  {
-    META_SERDE_HELPER.writeTo(channel, this);
-    Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
-  }
-
-
-  ///////////////
-  // VERSION TWO
-  ///////////////
-
   private static <T> GenericIndexed<T> createGenericIndexedVersionTwo(
       ByteBuffer byteBuffer,
       ObjectStrategy<T> strategy,
@@ -739,7 +776,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
         valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer();
       }
       ByteBuffer headerBuffer = 
fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName));
-      return new GenericIndexed<>(
+      return new GenericIndexed.V2<>(
           valueBuffersToUse,
           headerBuffer,
           strategy,
@@ -752,70 +789,4 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
       throw new RuntimeException("File mapping failed.", e);
     }
   }
-
-  @Nullable
-  private T getVersionTwo(int index)
-  {
-    checkIndex(index);
-
-    final int startOffset;
-    final int endOffset;
-
-    int relativePositionOfIndex = index & relativeIndexMask;
-    if (relativePositionOfIndex == 0) {
-      int headerPosition = index * Integer.BYTES;
-      startOffset = Integer.BYTES;
-      endOffset = headerBuffer.getInt(headerPosition);
-    } else {
-      int headerPosition = (index - 1) * Integer.BYTES;
-      startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
-      endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
-    }
-    int fileNum = index >> logBaseTwoOfElementsPerValueFile;
-    return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
-  }
-
-  private BufferIndexed singleThreadedVersionTwo()
-  {
-    final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length];
-    for (int i = 0; i < valueBuffers.length; i++) {
-      copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
-    }
-
-    return new BufferIndexed()
-    {
-      @Nullable
-      @Override
-      protected ByteBuffer getByteBuffer(int index)
-      {
-        checkIndex(index);
-
-        final int startOffset;
-        final int endOffset;
-
-        int relativePositionOfIndex = index & relativeIndexMask;
-        if (relativePositionOfIndex == 0) {
-          int headerPosition = index * Integer.BYTES;
-          startOffset = 4;
-          endOffset = headerBuffer.getInt(headerPosition);
-        } else {
-          int headerPosition = (index - 1) * Integer.BYTES;
-          startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
-          endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
-        }
-        int fileNum = index >> logBaseTwoOfElementsPerValueFile;
-        return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum], 
startOffset, endOffset);
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("headerBuffer", headerBuffer);
-        // Inspecting just one example of copyValueBuffer, not needed to 
inspect the whole array, because all buffers
-        // in it are the same.
-        inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? 
copyValueBuffers[0] : null);
-        inspector.visit("strategy", strategy);
-      }
-    };
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to