This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3711c0d987 Reduce heap footprint of GenericIndexed. (#14563)
3711c0d987 is described below
commit 3711c0d987213a0086053916a23775768f6f0a6b
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jul 12 08:11:41 2023 -0700
Reduce heap footprint of GenericIndexed. (#14563)
Two changes:
1) Intern DecompressingByteBufferObjectStrategy. Saves ~32 bytes per column.
2) Split GenericIndexed into GenericIndexed.V1 and GenericIndexed.V2. The
major benefit here is isolating out the ByteBuffers that are only needed
for V2. This saves ~80 bytes for V1 (one buffer instead of two).
---
.../data/BlockLayoutColumnarDoublesSupplier.java | 2 +-
.../data/BlockLayoutColumnarFloatsSupplier.java | 2 +-
.../data/BlockLayoutColumnarLongsSupplier.java | 2 +-
.../data/CompressedColumnarIntsSupplier.java | 4 +-
.../data/CompressedVSizeColumnarIntsSupplier.java | 4 +-
.../DecompressingByteBufferObjectStrategy.java | 23 +-
.../apache/druid/segment/data/GenericIndexed.java | 493 ++++++++++-----------
7 files changed, 261 insertions(+), 269 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index 98a7ab51f9..e3699ea3db 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -45,7 +45,7 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
CompressionStrategy strategy
)
{
- baseDoubleBuffers = GenericIndexed.read(fromBuffer, new
DecompressingByteBufferObjectStrategy(byteOrder, strategy));
+ baseDoubleBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
index 26d7c798c7..383a99b3f4 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSupplier.java
@@ -45,7 +45,7 @@ public class BlockLayoutColumnarFloatsSupplier implements
Supplier<ColumnarFloat
CompressionStrategy strategy
)
{
- baseFloatBuffers = GenericIndexed.read(fromBuffer, new
DecompressingByteBufferObjectStrategy(byteOrder, strategy));
+ baseFloatBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 29a0748bce..6fe04fbd31 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -47,7 +47,7 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
CompressionStrategy strategy
)
{
- baseLongBuffers = GenericIndexed.read(fromBuffer, new
DecompressingByteBufferObjectStrategy(order, strategy));
+ baseLongBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(order, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
this.baseReader = reader;
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
index e685721eb8..88e78f7980 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
@@ -125,7 +125,7 @@ public class CompressedColumnarIntsSupplier implements
WritableSupplier<Columnar
return new CompressedColumnarIntsSupplier(
totalSize,
sizePer,
- GenericIndexed.read(buffer, new
DecompressingByteBufferObjectStrategy(order, compression)),
+ GenericIndexed.read(buffer,
DecompressingByteBufferObjectStrategy.of(order, compression)),
compression
);
}
@@ -148,7 +148,7 @@ public class CompressedColumnarIntsSupplier implements
WritableSupplier<Columnar
return new CompressedColumnarIntsSupplier(
totalSize,
sizePer,
- GenericIndexed.read(buffer, new
DecompressingByteBufferObjectStrategy(order, compression), mapper),
+ GenericIndexed.read(buffer,
DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
compression
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index e63d6702f8..503d4f65fe 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -158,7 +158,7 @@ public class CompressedVSizeColumnarIntsSupplier implements
WritableSupplier<Col
totalSize,
sizePer,
numBytes,
- GenericIndexed.read(buffer, new
DecompressingByteBufferObjectStrategy(order, compression)),
+ GenericIndexed.read(buffer,
DecompressingByteBufferObjectStrategy.of(order, compression)),
compression
);
@@ -186,7 +186,7 @@ public class CompressedVSizeColumnarIntsSupplier implements
WritableSupplier<Col
totalSize,
sizePer,
numBytes,
- GenericIndexed.read(buffer, new
DecompressingByteBufferObjectStrategy(order, compression), mapper),
+ GenericIndexed.read(buffer,
DecompressingByteBufferObjectStrategy.of(order, compression), mapper),
compression
);
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
b/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
index 9c77d884b7..72db63b1a1 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/DecompressingByteBufferObjectStrategy.java
@@ -20,22 +20,43 @@
package org.apache.druid.segment.data;
import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.CompressedPools;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.concurrent.ConcurrentHashMap;
public class DecompressingByteBufferObjectStrategy implements
ObjectStrategy<ResourceHolder<ByteBuffer>>
{
+ /**
+ * Cache strategies in a static, because there are not very many distinct
ones -- there are only so many combinations
+ * of byte order and decompressor that we can possibly have -- and we need
one of these per GenericIndexed, which
+ * is a class that we tend to have tons of in heap.
+ */
+ private static final ConcurrentHashMap<Pair<ByteOrder, CompressionStrategy>,
DecompressingByteBufferObjectStrategy> STRATEGIES =
+ new ConcurrentHashMap<>();
+
private final ByteOrder order;
private final CompressionStrategy.Decompressor decompressor;
- DecompressingByteBufferObjectStrategy(ByteOrder order, CompressionStrategy
compression)
+ private DecompressingByteBufferObjectStrategy(ByteOrder order,
CompressionStrategy compression)
{
this.order = order;
this.decompressor = compression.getDecompressor();
}
+ public static DecompressingByteBufferObjectStrategy of(
+ final ByteOrder order,
+ final CompressionStrategy compression
+ )
+ {
+ return STRATEGIES.computeIfAbsent(
+ Pair.of(order, compression),
+ pair -> new DecompressingByteBufferObjectStrategy(pair.lhs, pair.rhs)
+ );
+ }
+
@Override
@SuppressWarnings("unchecked")
public Class<ResourceHolder<ByteBuffer>> getClazz()
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 11a8b61531..a87dcda09b 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -82,7 +82,7 @@ import java.util.Iterator;
*
* @see GenericIndexedWriter
*/
-public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
+public abstract class GenericIndexed<T> implements CloseableIndexed<T>,
Serializer
{
static final byte VERSION_ONE = 0x1;
static final byte VERSION_TWO = 0x2;
@@ -91,12 +91,6 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
static final int NULL_VALUE_SIZE_MARKER = -1;
- private static final MetaSerdeHelper<GenericIndexed> META_SERDE_HELPER =
MetaSerdeHelper
- .firstWriteByte((GenericIndexed x) -> VERSION_ONE)
- .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED :
REVERSE_LOOKUP_DISALLOWED)
- .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long)
Integer.BYTES))
- .writeInt(x -> x.size);
-
private static final SerializerUtils SERIALIZER_UTILS = new
SerializerUtils();
/**
@@ -220,7 +214,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
buffers,
GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression,
bufferSize, closer),
false,
- new DecompressingByteBufferObjectStrategy(order, compression)
+ DecompressingByteBufferObjectStrategy.of(order, compression)
);
}
@@ -238,75 +232,243 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
return numberOfFilesRequired;
}
+ protected final ObjectStrategy<T> strategy;
+ protected final boolean allowReverseLookup;
+ protected final int size;
- private final boolean versionOne;
+ public GenericIndexed(
+ final ObjectStrategy<T> strategy,
+ final boolean allowReverseLookup,
+ final int size
+ )
+ {
+ this.strategy = strategy;
+ this.allowReverseLookup = allowReverseLookup;
+ this.size = size;
+ }
- private final ObjectStrategy<T> strategy;
- private final boolean allowReverseLookup;
- private final int size;
- private final ByteBuffer headerBuffer;
+ public abstract BufferIndexed singleThreaded();
- private final ByteBuffer firstValueBuffer;
+ @Override
+ public abstract long getSerializedSize();
+
+ private static final class V1<T> extends GenericIndexed<T>
+ {
+ @SuppressWarnings("rawtypes")
+ private static final MetaSerdeHelper<GenericIndexed.V1> META_SERDE_HELPER
= MetaSerdeHelper
+ .firstWriteByte((GenericIndexed.V1 x) -> VERSION_ONE)
+ .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED :
REVERSE_LOOKUP_DISALLOWED)
+ .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long)
Integer.BYTES))
+ .writeInt(x -> x.size);
+
+ private final ByteBuffer theBuffer;
+ private final int headerOffset;
+ private final int valuesOffset;
+
+ V1(
+ final ByteBuffer buffer,
+ final ObjectStrategy<T> strategy,
+ final boolean allowReverseLookup
+ )
+ {
+ super(strategy, allowReverseLookup, buffer.getInt());
+ this.theBuffer = buffer;
+ this.headerOffset = theBuffer.position();
+ this.valuesOffset = theBuffer.position() + size * Integer.BYTES;
+ }
- private final ByteBuffer[] valueBuffers;
- private int logBaseTwoOfElementsPerValueFile;
- private int relativeIndexMask;
+ @Nullable
+ @Override
+ public T get(int index)
+ {
+ checkIndex(index);
- @Nullable
- private final ByteBuffer theBuffer;
+ final int startOffset;
+ final int endOffset;
- /**
- * Constructor for version one.
- */
- GenericIndexed(
- ByteBuffer buffer,
- ObjectStrategy<T> strategy,
- boolean allowReverseLookup
- )
- {
- this.versionOne = true;
+ if (index == 0) {
+ startOffset = Integer.BYTES;
+ endOffset = theBuffer.getInt(headerOffset);
+ } else {
+ int headerPosition = (index - 1) * Integer.BYTES;
+ startOffset = theBuffer.getInt(headerOffset + headerPosition) +
Integer.BYTES;
+ endOffset = theBuffer.getInt(headerOffset + headerPosition +
Integer.BYTES);
+ }
+ return copyBufferAndGet(theBuffer, valuesOffset + startOffset,
valuesOffset + endOffset);
+ }
- this.theBuffer = buffer;
- this.strategy = strategy;
- this.allowReverseLookup = allowReverseLookup;
- size = theBuffer.getInt();
+ @Override
+ public BufferIndexed singleThreaded()
+ {
+ final ByteBuffer copyBuffer = theBuffer.asReadOnlyBuffer();
+ return new BufferIndexed()
+ {
+ @Nullable
+ @Override
+ protected ByteBuffer getByteBuffer(final int index)
+ {
+ checkIndex(index);
+
+ final int startOffset;
+ final int endOffset;
+
+ if (index == 0) {
+ startOffset = Integer.BYTES;
+ endOffset = theBuffer.getInt(headerOffset);
+ } else {
+ int headerPosition = (index - 1) * Integer.BYTES;
+ startOffset = theBuffer.getInt(headerOffset + headerPosition) +
Integer.BYTES;
+ endOffset = theBuffer.getInt(headerOffset + headerPosition +
Integer.BYTES);
+ }
+ return bufferedIndexedGetByteBuffer(copyBuffer, valuesOffset +
startOffset, valuesOffset + endOffset);
+ }
- int indexOffset = theBuffer.position();
- int valuesOffset = theBuffer.position() + size * Integer.BYTES;
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("theBuffer", theBuffer);
+ inspector.visit("copyBuffer", copyBuffer);
+ inspector.visit("strategy", strategy);
+ }
+ };
+ }
- buffer.position(valuesOffset);
- // Ensure the value buffer's limit equals to capacity.
- firstValueBuffer = buffer.slice();
- valueBuffers = new ByteBuffer[]{firstValueBuffer};
- buffer.position(indexOffset);
- headerBuffer = buffer.slice();
+ @Override
+ public long getSerializedSize()
+ {
+ return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
+ {
+ META_SERDE_HELPER.writeTo(channel, this);
+ Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("theBuffer", theBuffer);
+ inspector.visit("strategy", strategy);
+ }
}
+ private static final class V2<T> extends GenericIndexed<T>
+ {
+ private final ByteBuffer headerBuffer;
+ private final ByteBuffer[] valueBuffers;
+ private final int logBaseTwoOfElementsPerValueFile;
+ private final int relativeIndexMask;
+
+ private V2(
+ ByteBuffer[] valueBuffs,
+ ByteBuffer headerBuff,
+ ObjectStrategy<T> strategy,
+ boolean allowReverseLookup,
+ int logBaseTwoOfElementsPerValueFile,
+ int numWritten
+ )
+ {
+ super(strategy, allowReverseLookup, numWritten);
+ this.valueBuffers = valueBuffs;
+ this.headerBuffer = headerBuff;
+ this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
+ this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
+ headerBuffer.order(ByteOrder.nativeOrder());
+ }
- /**
- * Constructor for version two.
- */
- GenericIndexed(
- ByteBuffer[] valueBuffs,
- ByteBuffer headerBuff,
- ObjectStrategy<T> strategy,
- boolean allowReverseLookup,
- int logBaseTwoOfElementsPerValueFile,
- int numWritten
- )
- {
- this.versionOne = false;
+ @Nullable
+ @Override
+ public T get(int index)
+ {
+ checkIndex(index);
- this.theBuffer = null;
- this.strategy = strategy;
- this.allowReverseLookup = allowReverseLookup;
- this.valueBuffers = valueBuffs;
- this.firstValueBuffer = valueBuffers[0];
- this.headerBuffer = headerBuff;
- this.size = numWritten;
- this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
- this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
- headerBuffer.order(ByteOrder.nativeOrder());
+ final int startOffset;
+ final int endOffset;
+
+ int relativePositionOfIndex = index & relativeIndexMask;
+ if (relativePositionOfIndex == 0) {
+ int headerPosition = index * Integer.BYTES;
+ startOffset = Integer.BYTES;
+ endOffset = headerBuffer.getInt(headerPosition);
+ } else {
+ int headerPosition = (index - 1) * Integer.BYTES;
+ startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
+ endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
+ }
+ int fileNum = index >> logBaseTwoOfElementsPerValueFile;
+ return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
+ }
+
+ @Override
+ public BufferIndexed singleThreaded()
+ {
+ final ByteBuffer[] copyValueBuffers = new
ByteBuffer[valueBuffers.length];
+ for (int i = 0; i < valueBuffers.length; i++) {
+ copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
+ }
+
+ return new BufferIndexed()
+ {
+ @Nullable
+ @Override
+ protected ByteBuffer getByteBuffer(int index)
+ {
+ checkIndex(index);
+
+ final int startOffset;
+ final int endOffset;
+
+ int relativePositionOfIndex = index & relativeIndexMask;
+ if (relativePositionOfIndex == 0) {
+ int headerPosition = index * Integer.BYTES;
+ startOffset = 4;
+ endOffset = headerBuffer.getInt(headerPosition);
+ } else {
+ int headerPosition = (index - 1) * Integer.BYTES;
+ startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
+ endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
+ }
+ int fileNum = index >> logBaseTwoOfElementsPerValueFile;
+ return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum],
startOffset, endOffset);
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("headerBuffer", headerBuffer);
+ // Inspecting just one example of copyValueBuffer, not needed to
inspect the whole array, because all buffers
+ // in it are the same.
+ inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ?
copyValueBuffers[0] : null);
+ inspector.visit("strategy", strategy);
+ }
+ };
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ throw new UnsupportedOperationException("Method not supported for
version 2 GenericIndexed.");
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
+ {
+ throw new UnsupportedOperationException(
+ "GenericIndexed serialization for V2 is unsupported. Use
GenericIndexedWriter instead.");
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("headerBuffer", headerBuffer);
+
+ // Inspecting just one example of valueBuffer, not needed to inspect the
whole array, because all buffers in it
+ // are the same.
+ inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0]
: null);
+ inspector.visit("strategy", strategy);
+ }
}
/**
@@ -317,7 +479,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
*
* @param index index identifying an element of an GenericIndexed.
*/
- private void checkIndex(int index)
+ protected void checkIndex(int index)
{
if (index < 0) {
throw new IAE("Index[%s] < 0", index);
@@ -338,12 +500,6 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
return size;
}
- @Override
- public T get(int index)
- {
- return versionOne ? getVersionOne(index) : getVersionTwo(index);
- }
-
/**
* Returns the index of "value" in this GenericIndexed object, or
(-(insertion point) - 1) if the value is not
* present, in the manner of Arrays.binarySearch. This strengthens the
contract of Indexed, which only guarantees
@@ -393,38 +549,8 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
return IndexedIterable.create(this).iterator();
}
- @Override
- public long getSerializedSize()
- {
- if (!versionOne) {
- throw new UnsupportedOperationException("Method not supported for
version 2 GenericIndexed.");
- }
- return getSerializedSizeVersionOne();
- }
-
- @Override
- public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
- {
- if (versionOne) {
- writeToVersionOne(channel);
- } else {
- throw new UnsupportedOperationException(
- "GenericIndexed serialization for V2 is unsupported. Use
GenericIndexedWriter instead.");
- }
- }
-
- /**
- * Create a non-thread-safe Indexed, which may perform better than the
underlying Indexed.
- *
- * @return a non-thread-safe Indexed
- */
- public GenericIndexed<T>.BufferIndexed singleThreaded()
- {
- return versionOne ? singleThreadedVersionOne() :
singleThreadedVersionTwo();
- }
-
@Nullable
- private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int
endOffset)
+ protected T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int
endOffset)
{
ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer();
int size = endOffset - startOffset;
@@ -439,21 +565,6 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
return strategy.fromByteBuffer(copyValueBuffer, size);
}
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("versionOne", versionOne);
- inspector.visit("headerBuffer", headerBuffer);
- if (versionOne) {
- inspector.visit("firstValueBuffer", firstValueBuffer);
- } else {
- // Inspecting just one example of valueBuffer, not needed to inspect the
whole array, because all buffers in it
- // are the same.
- inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0]
: null);
- }
- inspector.visit("strategy", strategy);
- }
-
/**
* Single-threaded view.
*/
@@ -567,10 +678,6 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
// nothing to close
}
- ///////////////
- // VERSION ONE
- ///////////////
-
private static <T> GenericIndexed<T>
createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T>
strategy)
{
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
@@ -579,7 +686,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
bufferToUse.limit(bufferToUse.position() + size);
byteBuffer.position(bufferToUse.limit());
- return new GenericIndexed<>(
+ return new GenericIndexed.V1<>(
bufferToUse,
strategy,
allowReverseLookup
@@ -597,7 +704,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
if (!objects.hasNext()) {
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0);
buffer.flip();
- return new GenericIndexed<>(buffer, resultObjectStrategy,
allowReverseLookup);
+ return new GenericIndexed.V1<>(buffer, resultObjectStrategy,
allowReverseLookup);
}
int count = 0;
@@ -642,79 +749,9 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
valuesOut.writeTo(theBuffer);
theBuffer.flip();
- return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(),
resultObjectStrategy, allowReverseLookup);
- }
-
- private long getSerializedSizeVersionOne()
- {
- return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining();
- }
-
- @Nullable
- private T getVersionOne(int index)
- {
- checkIndex(index);
-
- final int startOffset;
- final int endOffset;
-
- if (index == 0) {
- startOffset = Integer.BYTES;
- endOffset = headerBuffer.getInt(0);
- } else {
- int headerPosition = (index - 1) * Integer.BYTES;
- startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
- endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
- }
- return copyBufferAndGet(firstValueBuffer, startOffset, endOffset);
+ return new GenericIndexed.V1<>(theBuffer.asReadOnlyBuffer(),
resultObjectStrategy, allowReverseLookup);
}
- private BufferIndexed singleThreadedVersionOne()
- {
- final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer();
- return new BufferIndexed()
- {
- @Nullable
- @Override
- protected ByteBuffer getByteBuffer(final int index)
- {
- checkIndex(index);
-
- final int startOffset;
- final int endOffset;
-
- if (index == 0) {
- startOffset = Integer.BYTES;
- endOffset = headerBuffer.getInt(0);
- } else {
- int headerPosition = (index - 1) * Integer.BYTES;
- startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
- endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
- }
- return bufferedIndexedGetByteBuffer(copyBuffer, startOffset,
endOffset);
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("headerBuffer", headerBuffer);
- inspector.visit("copyBuffer", copyBuffer);
- inspector.visit("strategy", strategy);
- }
- };
- }
-
- private void writeToVersionOne(WritableByteChannel channel) throws
IOException
- {
- META_SERDE_HELPER.writeTo(channel, this);
- Channels.writeFully(channel, theBuffer.asReadOnlyBuffer());
- }
-
-
- ///////////////
- // VERSION TWO
- ///////////////
-
private static <T> GenericIndexed<T> createGenericIndexedVersionTwo(
ByteBuffer byteBuffer,
ObjectStrategy<T> strategy,
@@ -739,7 +776,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer();
}
ByteBuffer headerBuffer =
fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName));
- return new GenericIndexed<>(
+ return new GenericIndexed.V2<>(
valueBuffersToUse,
headerBuffer,
strategy,
@@ -752,70 +789,4 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
throw new RuntimeException("File mapping failed.", e);
}
}
-
- @Nullable
- private T getVersionTwo(int index)
- {
- checkIndex(index);
-
- final int startOffset;
- final int endOffset;
-
- int relativePositionOfIndex = index & relativeIndexMask;
- if (relativePositionOfIndex == 0) {
- int headerPosition = index * Integer.BYTES;
- startOffset = Integer.BYTES;
- endOffset = headerBuffer.getInt(headerPosition);
- } else {
- int headerPosition = (index - 1) * Integer.BYTES;
- startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
- endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
- }
- int fileNum = index >> logBaseTwoOfElementsPerValueFile;
- return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
- }
-
- private BufferIndexed singleThreadedVersionTwo()
- {
- final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length];
- for (int i = 0; i < valueBuffers.length; i++) {
- copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
- }
-
- return new BufferIndexed()
- {
- @Nullable
- @Override
- protected ByteBuffer getByteBuffer(int index)
- {
- checkIndex(index);
-
- final int startOffset;
- final int endOffset;
-
- int relativePositionOfIndex = index & relativeIndexMask;
- if (relativePositionOfIndex == 0) {
- int headerPosition = index * Integer.BYTES;
- startOffset = 4;
- endOffset = headerBuffer.getInt(headerPosition);
- } else {
- int headerPosition = (index - 1) * Integer.BYTES;
- startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES;
- endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES);
- }
- int fileNum = index >> logBaseTwoOfElementsPerValueFile;
- return bufferedIndexedGetByteBuffer(copyValueBuffers[fileNum],
startOffset, endOffset);
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("headerBuffer", headerBuffer);
- // Inspecting just one example of copyValueBuffer, not needed to
inspect the whole array, because all buffers
- // in it are the same.
- inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ?
copyValueBuffers[0] : null);
- inspector.visit("strategy", strategy);
- }
- };
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]