This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2726c6f3882 Minor refactors to processing
2726c6f3882 is described below
commit 2726c6f38823d42f7f82d3b4e8742f7a80fff19f
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Nov 21 15:37:55 2024 +0530
Minor refactors to processing
Some refactors across druid to clean up the code and add utility functions
where required.
---
.../org/apache/druid/msq/test/MSQTestBase.java | 3 +-
.../druid/query/aggregation/SerializedStorage.java | 195 ++++++++----
.../java/org/apache/druid/segment/IndexIO.java | 3 +-
.../java/org/apache/druid/segment/IndexSpec.java | 2 +-
.../column/StringUtf8DictionaryEncodedColumn.java | 11 +-
.../data/BlockLayoutColumnarDoublesSupplier.java | 12 +-
.../data/BlockLayoutColumnarLongsSupplier.java | 15 +-
.../data/CompressedVSizeColumnarIntsSupplier.java | 7 +-
.../druid/segment/data/CompressionFactory.java | 1 -
.../nested/NestedCommonFormatColumnSerializer.java | 2 +-
.../segment/nested/NestedDataComplexTypeSerde.java | 86 +++--
.../druid/segment/nested/ScalarDoubleColumn.java | 27 +-
.../nested/ScalarDoubleColumnAndIndexSupplier.java | 21 +-
.../druid/segment/nested/ScalarLongColumn.java | 27 +-
.../nested/ScalarLongColumnAndIndexSupplier.java | 21 +-
.../nested/ScalarStringColumnAndIndexSupplier.java | 9 +-
.../apache/druid/segment/nested/VariantColumn.java | 6 +-
.../nested/VariantColumnAndIndexSupplier.java | 3 +-
.../segment/nested/VariantColumnSerializer.java | 349 +++++++++++++--------
.../serde/DictionaryEncodedColumnPartSerde.java | 3 +-
.../StringUtf8DictionaryEncodedColumnSupplier.java | 15 +-
.../druid/segment/serde/cell/IOIterator.java | 4 +-
.../druid/segment/serde/cell/StagedSerde.java | 35 +++
.../filter/PredicateValueMatcherFactoryTest.java | 7 +-
.../druid/segment/filter/ValueMatchersTest.java | 11 +-
25 files changed, 621 insertions(+), 254 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index e99b571b807..060f8499e3e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -511,7 +511,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder ->
binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)),
new JoinableFactoryModule(),
new IndexingServiceTuningConfigModule(),
- new MSQIndexingModule(),
Modules.override(new MSQSqlModule()).with(
binder -> {
// Our Guice configuration currently requires bindings to exist
even if they aren't ever used, the
@@ -540,6 +539,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
objectMapper = setupObjectMapper(injector);
objectMapper.registerModules(new
StorageConnectorModule().getJacksonModules());
+ objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
objectMapper.registerModules(sqlModule.getJacksonModules());
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
@@ -697,7 +697,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);
-
}
Segment segment = new Segment()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
index a6fee46b3ca..29d03d14615 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
@@ -19,18 +19,17 @@
package org.apache.druid.query.aggregation;
-import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.druid.error.DruidException;
import org.apache.druid.segment.serde.cell.IOIterator;
-import org.apache.druid.segment.serde.cell.IntSerializer;
import org.apache.druid.segment.serde.cell.StagedSerde;
import org.apache.druid.segment.writeout.WriteOutBytes;
import javax.annotation.Nullable;
-import java.io.BufferedInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.IntBuffer;
import java.util.NoSuchElementException;
/**
@@ -45,109 +44,181 @@ public class SerializedStorage<T>
{
private final WriteOutBytes writeOutBytes;
private final StagedSerde<T> serde;
- private final IntSerializer intSerializer = new IntSerializer();
+ private final ByteBuffer itemOffsetsBytes;
+ private final IntBuffer itemSizes;
+
+ private final LongArrayList rowChunkOffsets = new LongArrayList();
+ private int numStored = 0;
+ private int maxSize = 0;
public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
+ {
+ this(writeOutBytes, serde, 4096);
+ }
+
+ public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde,
int chunkSize)
{
this.writeOutBytes = writeOutBytes;
this.serde = serde;
+
+ this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize *
Integer.BYTES).order(ByteOrder.nativeOrder());
+ this.itemSizes = itemOffsetsBytes.asIntBuffer();
}
public void store(@Nullable T value) throws IOException
{
byte[] bytes = serde.serialize(value);
- writeOutBytes.write(intSerializer.serialize(bytes.length));
- writeOutBytes.write(bytes);
+ maxSize = Math.max(maxSize, bytes.length);
+ itemSizes.put(bytes.length);
+ if (bytes.length > 0) {
+ writeOutBytes.write(bytes);
+ }
+
+ ++numStored;
+ if (itemSizes.remaining() == 0) {
+ rowChunkOffsets.add(writeOutBytes.size());
+ writeOutBytes.write(itemOffsetsBytes);
+ itemOffsetsBytes.clear();
+ itemSizes.clear();
+ }
}
+ public int numStored()
+ {
+ return numStored;
+ }
+
+ /**
+ * Generates an iterator over everything that has been stored. Also
signifies the end of storing objects.
+ * iterator() can be called multiple times if needed, but after iterator()
is called, store() can no longer be
+ * called.
+ *
+ * @return an iterator
+ * @throws IOException on failure
+ */
public IOIterator<T> iterator() throws IOException
{
- return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
+ if (itemSizes.position() != itemSizes.limit()) {
+ rowChunkOffsets.add(writeOutBytes.size());
+ itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES);
+ writeOutBytes.write(itemOffsetsBytes);
+
+ // Move the limit to the position so that we fail subsequent writes and
indicate that we are done
+ itemSizes.limit(itemSizes.position());
+ }
+
+ return new DeserializingIOIterator<>(
+ writeOutBytes,
+ rowChunkOffsets,
+ numStored,
+ itemSizes.capacity(),
+ maxSize,
+ serde
+ );
}
private static class DeserializingIOIterator<T> implements IOIterator<T>
{
- private static final int NEEDS_READ = -2;
- private static final int EOF = -1;
+ private static final ByteBuffer EMPTY_BUFFER =
ByteBuffer.allocate(0).asReadOnlyBuffer();
- private final byte[] intBytes;
- private final BufferedInputStream inputStream;
+ private final WriteOutBytes medium;
+ private final LongArrayList rowChunkOffsets;
+ private final int numEntries;
+ private ByteBuffer tmpBuf;
private final StagedSerde<T> serde;
- private int nextSize;
-
- public DeserializingIOIterator(InputStream inputStream, StagedSerde<T>
serde)
+ private final ByteBuffer itemOffsetsBytes;
+ private final int[] itemSizes;
+
+ private long itemStartOffset;
+ private int chunkId = 0;
+ private int currId = 0;
+ private int itemIndex;
+
+ public DeserializingIOIterator(
+ WriteOutBytes medium,
+ LongArrayList rowChunkOffsets,
+ int numEntries,
+ int chunkSize,
+ int maxSize,
+ StagedSerde<T> serde
+ )
{
- this.inputStream = new BufferedInputStream(inputStream);
+ this.medium = medium;
+ this.rowChunkOffsets = rowChunkOffsets;
+ this.numEntries = numEntries;
+ this.tmpBuf =
ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder());
this.serde = serde;
- intBytes = new byte[Integer.BYTES];
- nextSize = NEEDS_READ;
+
+ this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize *
Integer.BYTES).order(ByteOrder.nativeOrder());
+ this.itemSizes = new int[chunkSize];
+ this.itemIndex = chunkSize;
}
@Override
- public boolean hasNext() throws IOException
+ public boolean hasNext()
{
- return getNextSize() > EOF;
+ return currId < numEntries;
}
@Override
public T next() throws IOException
{
- int currentNextSize = getNextSize();
-
- if (currentNextSize == -1) {
- throw new NoSuchElementException("end of buffer reached");
+ if (currId >= numEntries) {
+ throw new NoSuchElementException();
}
- byte[] nextBytes = new byte[currentNextSize];
- int bytesRead = 0;
-
- while (bytesRead < currentNextSize) {
- int result = inputStream.read(nextBytes, bytesRead, currentNextSize -
bytesRead);
-
- if (result == -1) {
- throw new NoSuchElementException("unexpected end of buffer reached");
+ if (itemIndex >= itemSizes.length) {
+ if (chunkId == 0) {
+ itemStartOffset = 0;
+ } else {
+ if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) {
+ throw DruidException.defensive(
+ "Should have read up to the start of the offsets [%,d], "
+ + "but for some reason the values [%,d] don't align. Possible
corruption?",
+ rowChunkOffsets.getLong(chunkId - 1),
+ itemStartOffset
+ );
+ }
+ itemStartOffset += (((long) itemSizes.length) * Integer.BYTES);
}
- bytesRead += result;
- }
-
- Preconditions.checkState(bytesRead == currentNextSize);
- T value = serde.deserialize(nextBytes);
-
- nextSize = NEEDS_READ;
-
- return value;
- }
-
- private int getNextSize() throws IOException
- {
- if (nextSize == NEEDS_READ) {
- int bytesRead = 0;
-
- while (bytesRead < Integer.BYTES) {
- int result = inputStream.read(intBytes, bytesRead, Integer.BYTES -
bytesRead);
+ int numToRead = Math.min(itemSizes.length, numEntries - (chunkId *
itemSizes.length));
+ final long readOffset = rowChunkOffsets.getLong(chunkId++);
+ itemOffsetsBytes.clear();
+ itemOffsetsBytes.limit(numToRead * Integer.BYTES);
+ medium.readFully(readOffset, itemOffsetsBytes);
+ itemOffsetsBytes.flip();
+ itemOffsetsBytes.asIntBuffer().get(itemSizes, 0, numToRead);
- if (result == -1) {
- nextSize = EOF;
- return EOF;
- } else {
- bytesRead += result;
- }
- }
- Preconditions.checkState(bytesRead == Integer.BYTES);
+ itemIndex = 0;
+ }
- nextSize =
ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
+ int bytesToRead = itemSizes[itemIndex];
+ final T retVal;
+ if (bytesToRead == 0) {
+ retVal = serde.deserialize(EMPTY_BUFFER);
+ } else {
+ tmpBuf.clear();
+ tmpBuf.limit(bytesToRead);
+ medium.readFully(itemStartOffset, tmpBuf);
+ tmpBuf.flip();
+
+ retVal = serde.deserialize(tmpBuf);
}
- return nextSize;
+ itemStartOffset += bytesToRead;
+ ++itemIndex;
+ ++currId;
+
+ return retVal;
}
@Override
- public void close() throws IOException
+ public void close()
{
- inputStream.close();
+
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 8470c63f3e7..a6ddad61406 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -456,7 +456,8 @@ public class IndexIO
new StringUtf8DictionaryEncodedColumnSupplier<>(
index.getDimValueUtf8Lookup(dimension)::singleThreaded,
null,
- Suppliers.ofInstance(index.getDimColumn(dimension))
+ Suppliers.ofInstance(index.getDimColumn(dimension)),
+ LEGACY_FACTORY.getBitmapFactory()
)
);
GenericIndexed<ImmutableBitmap> bitmaps =
index.getBitmapIndexes().get(dimension);
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
index 37a7e6cc0d6..1dc2f849682 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
@@ -43,7 +43,7 @@ import java.util.Objects;
*/
public class IndexSpec
{
- public static IndexSpec DEFAULT = IndexSpec.builder().build();
+ public static final IndexSpec DEFAULT = IndexSpec.builder().build();
public static Builder builder()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
index bed58a43675..59da8ffcb0a 100644
---
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.column;
import com.google.common.collect.Lists;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
@@ -73,16 +74,19 @@ public class StringUtf8DictionaryEncodedColumn implements
DictionaryEncodedColum
@Nullable
private final ColumnarMultiInts multiValueColumn;
private final Indexed<ByteBuffer> utf8Dictionary;
+ private final BitmapFactory bitmapFactory;
public StringUtf8DictionaryEncodedColumn(
@Nullable ColumnarInts singleValueColumn,
@Nullable ColumnarMultiInts multiValueColumn,
- Indexed<ByteBuffer> utf8Dictionary
+ Indexed<ByteBuffer> utf8Dictionary,
+ BitmapFactory bitmapFactory
)
{
this.column = singleValueColumn;
this.multiValueColumn = multiValueColumn;
this.utf8Dictionary = utf8Dictionary;
+ this.bitmapFactory = bitmapFactory;
}
@Override
@@ -135,6 +139,11 @@ public class StringUtf8DictionaryEncodedColumn implements
DictionaryEncodedColum
return utf8Dictionary.size();
}
+ public BitmapFactory getBitmapFactory()
+ {
+ return bitmapFactory;
+ }
+
@Override
public HistoricalDimensionSelector makeDimensionSelector(
final ReadableOffset offset,
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index 4473fa77d46..010e0b69857 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -36,6 +36,7 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
// The number of doubles per buffer.
private final int sizePer;
+ private final CompressionStrategy strategy;
public BlockLayoutColumnarDoublesSupplier(
int totalSize,
@@ -45,7 +46,8 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
CompressionStrategy strategy
)
{
- baseDoubleBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
+ this.strategy = strategy;
+ this.baseDoubleBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
}
@@ -78,7 +80,8 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
}
}
- private class BlockLayoutColumnarDoubles implements ColumnarDoubles
+ // This needs to be a public class so that SemanticCreator is able to call
it.
+ public class BlockLayoutColumnarDoubles implements ColumnarDoubles
{
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers =
baseDoubleBuffers.singleThreaded();
@@ -91,6 +94,11 @@ public class BlockLayoutColumnarDoublesSupplier implements
Supplier<ColumnarDoub
@Nullable
DoubleBuffer doubleBuffer;
+ public CompressionStrategy getCompressionStrategy()
+ {
+ return strategy;
+ }
+
@Override
public int size()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 2b46d9aa6e2..aa0346c6e34 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
// The number of longs per buffer.
private final int sizePer;
private final CompressionFactory.LongEncodingReader baseReader;
+ private final CompressionStrategy strategy;
public BlockLayoutColumnarLongsSupplier(
int totalSize,
@@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
CompressionStrategy strategy
)
{
+ this.strategy = strategy;
this.baseLongBuffers = GenericIndexed.read(fromBuffer,
DecompressingByteBufferObjectStrategy.of(order, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
@@ -124,7 +126,8 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
}
}
- private class BlockLayoutColumnarLongs implements ColumnarLongs
+ // This needs to be a public class so that SemanticCreator is able to call
it.
+ public class BlockLayoutColumnarLongs implements ColumnarLongs
{
final CompressionFactory.LongEncodingReader reader =
baseReader.duplicate();
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedLongBuffers =
baseLongBuffers.singleThreaded();
@@ -140,6 +143,16 @@ public class BlockLayoutColumnarLongsSupplier implements
Supplier<ColumnarLongs>
@Nullable
LongBuffer longBuffer;
+ public CompressionFactory.LongEncodingStrategy getEncodingStrategy()
+ {
+ return baseReader.getStrategy();
+ }
+
+ public CompressionStrategy getCompressionStrategy()
+ {
+ return strategy;
+ }
+
@Override
public int size()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index eaf0b2a47a9..dfa6667bf01 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -308,7 +308,7 @@ public class CompressedVSizeColumnarIntsSupplier implements
WritableSupplier<Col
}
}
- private class CompressedVSizeColumnarInts implements ColumnarInts
+ public class CompressedVSizeColumnarInts implements ColumnarInts
{
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedBuffers =
baseBuffers.singleThreaded();
@@ -329,6 +329,11 @@ public class CompressedVSizeColumnarIntsSupplier
implements WritableSupplier<Col
return totalSize;
}
+ public CompressionStrategy getCompressionStrategy()
+ {
+ return compression;
+ }
+
/**
* Returns the value at the given index into the column.
* <p/>
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index 91ec70b7f17..9e7d2ea5b61 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -303,7 +303,6 @@ public class CompressionFactory
*/
LongEncodingReader duplicate();
- @SuppressWarnings("unused")
LongEncodingStrategy getStrategy();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 54653f286a9..88b5c240e1b 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -79,7 +79,7 @@ public abstract class NestedCommonFormatColumnSerializer
implements GenericColum
ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(),
fileName);
}
- protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper
fileMapper) throws IOException
+ protected static void copyFromTempSmoosh(FileSmoosher smoosher,
SmooshedFileMapper fileMapper) throws IOException
{
for (String internalName : fileMapper.getInternalFilenames()) {
smoosher.add(internalName, fileMapper.mapFile(internalName));
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
index cb11f62bb0f..3236b42402c 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
@@ -22,7 +22,7 @@ package org.apache.druid.segment.nested;
import com.fasterxml.jackson.core.JsonProcessingException;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.NestedDataColumnHandlerV4;
@@ -117,29 +117,14 @@ public class NestedDataComplexTypeSerde extends
ComplexMetricSerde
@Override
public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
{
- final byte[] bytes = new byte[numBytes];
- buffer.get(bytes, 0, numBytes);
- try {
- return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes,
StructuredData.class);
- }
- catch (IOException e) {
- throw new ISE(e, "Unable to deserialize value");
- }
+ return deserializeBuffer(buffer, numBytes);
}
@Nullable
@Override
public byte[] toBytes(@Nullable Object val)
{
- if (val == null) {
- return new byte[0];
- }
- try {
- return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val);
- }
- catch (JsonProcessingException e) {
- throw new ISE(e, "Unable to serialize value [%s]", val);
- }
+ return serializeToBytes(val);
}
@Override
@@ -150,6 +135,71 @@ public class NestedDataComplexTypeSerde extends
ComplexMetricSerde
};
}
+ /**
+ * Reads numBytes from the position to the limit of the byte buffer argument
and deserailizes it into
+ * a {@link StructuredData} object using {@link
ColumnSerializerUtils#SMILE_MAPPER}.
+ */
+ public static StructuredData deserializeBuffer(ByteBuffer buf)
+ {
+ return deserializeBuffer(buf, buf.remaining());
+ }
+
+ /**
+ * Reads numBytes from the byte buffer argument and deserailizes it into a
{@link StructuredData} object
+ * using {@link ColumnSerializerUtils#SMILE_MAPPER}.
+ */
+ public static StructuredData deserializeBuffer(ByteBuffer buf, int numBytes)
+ {
+ if (numBytes == 0) {
+ return null;
+ }
+
+ final byte[] bytes = new byte[numBytes];
+ buf.get(bytes, 0, numBytes);
+ return deserializeBytes(bytes);
+ }
+
+ /**
+ * Converts the bytes array into a {@link StructuredData} object using
{@link ColumnSerializerUtils#SMILE_MAPPER}.
+ */
+ public static StructuredData deserializeBytes(byte[] bytes)
+ {
+ return deserializeBytes(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Reads the bytes between offset and len from the byte array and
deserializes a {@link StructuredData} object from
+ * it, using {@link ColumnSerializerUtils#SMILE_MAPPER}.
+ */
+ public static StructuredData deserializeBytes(byte[] bytes, int offset, int
len)
+ {
+ if (len == 0) {
+ return null;
+ }
+ try {
+ return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, offset, len,
StructuredData.class);
+ }
+ catch (IOException e) {
+ throw DruidException.defensive(e, "Unable to deserialize value");
+ }
+ }
+
+ /**
+ * Returns a byte array containing the val as serialized by {@link
ColumnSerializerUtils#SMILE_MAPPER}.
+ */
+ public static byte[] serializeToBytes(@Nullable Object val)
+ {
+ if (val == null) {
+ return new byte[0];
+ }
+ try {
+ return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val);
+ }
+ catch (JsonProcessingException e) {
+ throw DruidException.defensive(e, "Unable to serialize value [%s]", val);
+ }
+ }
+
@Override
public <T extends Comparable<T>> TypeStrategy<T> getTypeStrategy()
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
index 6ade80b6e83..e116a70d28d 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.nested;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.semantic.SemanticUtils;
@@ -27,6 +28,7 @@ import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarDoubles;
+import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ReadableOffset;
@@ -40,6 +42,7 @@ import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE}
@@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
private final FixedIndexed<Double> doubleDictionary;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final ColumnarDoubles valueColumn;
- private final ImmutableBitmap nullValueIndex;
+ private final ImmutableBitmap nullValueBitmap;
+ private final BitmapFactory bitmapFactory;
public ScalarDoubleColumn(
FixedIndexed<Double> doubleDictionary,
+ Supplier<ColumnarInts> encodedValuesSupplier,
ColumnarDoubles valueColumn,
- ImmutableBitmap nullValueIndex
+ ImmutableBitmap nullValueBitmap,
+ BitmapFactory bitmapFactory
)
{
this.doubleDictionary = doubleDictionary;
+ this.encodedValuesSupplier = encodedValuesSupplier;
this.valueColumn = valueColumn;
- this.nullValueIndex = nullValueIndex;
+ this.nullValueBitmap = nullValueBitmap;
+ this.bitmapFactory = bitmapFactory;
}
@Override
@@ -81,7 +90,7 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
{
return new DoubleColumnSelector()
{
- private PeekableIntIterator nullIterator =
nullValueIndex.peekableIterator();
+ private PeekableIntIterator nullIterator =
nullValueBitmap.peekableIterator();
private int nullMark = -1;
private int offsetMark = -1;
@@ -95,7 +104,7 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("doubleColumn", valueColumn);
- inspector.visit("nullBitmap", nullValueIndex);
+ inspector.visit("nullBitmap", nullValueBitmap);
}
@Override
@@ -108,7 +117,7 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
if (i < offsetMark) {
// offset was reset, reset iterator state
nullMark = -1;
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = i;
if (nullMark < i) {
@@ -133,7 +142,7 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
private int id = ReadableVectorInspector.NULL_ID;
@Nullable
- private PeekableIntIterator nullIterator = nullValueIndex != null ?
nullValueIndex.peekableIterator() : null;
+ private PeekableIntIterator nullIterator = nullValueBitmap != null ?
nullValueBitmap.peekableIterator() : null;
private int offsetMark = -1;
@Override
@@ -162,14 +171,14 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
if (offset.isContiguous()) {
if (offset.getStartOffset() < offsetMark) {
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize();
valueColumn.get(valueVector, offset.getStartOffset(),
offset.getCurrentVectorSize());
} else {
final int[] offsets = offset.getOffsets();
if (offsets[offsets.length - 1] < offsetMark) {
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = offsets[offsets.length - 1];
valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize());
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
index 6fa2fe6d0f0..1bde18e188a 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
@@ -51,7 +51,9 @@ import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.ColumnarDoubles;
+import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.VByte;
@@ -109,6 +111,11 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
columnName,
ColumnSerializerUtils.DOUBLE_VALUE_COLUMN_FILE_NAME
);
+ final ByteBuffer encodedValuesBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
+ mapper,
+ columnName,
+ ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
+ );
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
if (parent != null) {
@@ -128,6 +135,12 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
);
}
+ final CompressedVSizeColumnarIntsSupplier encodedCol =
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+ encodedValuesBuffer,
+ byteOrder,
+ columnBuilder.getFileMapper()
+ );
+
final Supplier<ColumnarDoubles> doubles =
CompressedColumnarDoublesSuppliers.fromByteBuffer(
doublesValueColumn,
byteOrder
@@ -144,6 +157,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
);
return new ScalarDoubleColumnAndIndexSupplier(
doubleDictionarySupplier,
+ encodedCol,
doubles,
rBitmaps,
bitmapSerdeFactory.getBitmapFactory(),
@@ -160,6 +174,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final Supplier<ColumnarDoubles> valueColumnSupplier;
private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -170,6 +185,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
private ScalarDoubleColumnAndIndexSupplier(
Supplier<FixedIndexed<Double>> longDictionary,
+ Supplier<ColumnarInts> encodedValuesSupplier,
Supplier<ColumnarDoubles> valueColumnSupplier,
GenericIndexed<ImmutableBitmap> valueIndexes,
BitmapFactory bitmapFactory,
@@ -177,6 +193,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
)
{
this.doubleDictionarySupplier = longDictionary;
+ this.encodedValuesSupplier = encodedValuesSupplier;
this.valueColumnSupplier = valueColumnSupplier;
this.valueIndexes = valueIndexes;
this.bitmapFactory = bitmapFactory;
@@ -189,8 +206,10 @@ public class ScalarDoubleColumnAndIndexSupplier implements
Supplier<NestedCommon
{
return new ScalarDoubleColumn(
doubleDictionarySupplier.get(),
+ encodedValuesSupplier,
valueColumnSupplier.get(),
- nullValueBitmap
+ nullValueBitmap,
+ bitmapFactory
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
index 8a54ff31278..4d5db6a45f4 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.nested;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.semantic.SemanticUtils;
@@ -26,6 +27,7 @@ import
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.LongColumnSelector;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarLongs;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.Indexed;
@@ -40,6 +42,7 @@ import org.roaringbitmap.PeekableIntIterator;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* {@link NestedCommonFormatColumn} for {@link ColumnType#LONG}
@@ -50,18 +53,24 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
SemanticUtils.makeAsMap(ScalarLongColumn.class);
private final FixedIndexed<Long> longDictionary;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final ColumnarLongs valueColumn;
- private final ImmutableBitmap nullValueIndex;
+ private final ImmutableBitmap nullValueBitmap;
+ private final BitmapFactory bitmapFactory;
public ScalarLongColumn(
FixedIndexed<Long> longDictionary,
+ Supplier<ColumnarInts> encodedValuesSupplier,
ColumnarLongs valueColumn,
- ImmutableBitmap nullValueIndex
+ ImmutableBitmap nullValueBitmap,
+ BitmapFactory bitmapFactory
)
{
this.longDictionary = longDictionary;
+ this.encodedValuesSupplier = encodedValuesSupplier;
this.valueColumn = valueColumn;
- this.nullValueIndex = nullValueIndex;
+ this.nullValueBitmap = nullValueBitmap;
+ this.bitmapFactory = bitmapFactory;
}
@@ -82,7 +91,7 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
{
return new LongColumnSelector()
{
- private PeekableIntIterator nullIterator =
nullValueIndex.peekableIterator();
+ private PeekableIntIterator nullIterator =
nullValueBitmap.peekableIterator();
private int nullMark = -1;
private int offsetMark = -1;
@@ -96,7 +105,7 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("longColumn", valueColumn);
- inspector.visit("nullBitmap", nullValueIndex);
+ inspector.visit("nullBitmap", nullValueBitmap);
}
@Override
@@ -109,7 +118,7 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
if (i < offsetMark) {
// offset was reset, reset iterator state
nullMark = -1;
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = i;
if (nullMark < i) {
@@ -134,7 +143,7 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
private int id = ReadableVectorInspector.NULL_ID;
@Nullable
- private PeekableIntIterator nullIterator =
nullValueIndex.peekableIterator();
+ private PeekableIntIterator nullIterator =
nullValueBitmap.peekableIterator();
private int offsetMark = -1;
@Override
@@ -163,14 +172,14 @@ public class ScalarLongColumn implements
NestedCommonFormatColumn
if (offset.isContiguous()) {
if (offset.getStartOffset() < offsetMark) {
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize();
valueColumn.get(valueVector, offset.getStartOffset(),
offset.getCurrentVectorSize());
} else {
final int[] offsets = offset.getOffsets();
if (offsets[offsets.length - 1] < offsetMark) {
- nullIterator = nullValueIndex.peekableIterator();
+ nullIterator = nullValueBitmap.peekableIterator();
}
offsetMark = offsets[offsets.length - 1];
valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize());
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
index a8d1fa057d8..1b1b9fb97e7 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
@@ -49,8 +49,10 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarLongs;
import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.VByte;
@@ -103,6 +105,11 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
+ final ByteBuffer encodedValuesBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
+ mapper,
+ columnName,
+ ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
+ );
final ByteBuffer longsValueColumn =
NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
@@ -137,12 +144,19 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
);
}
+ final CompressedVSizeColumnarIntsSupplier encodedCol =
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+ encodedValuesBuffer,
+ byteOrder,
+ columnBuilder.getFileMapper()
+ );
+
final Supplier<ColumnarLongs> longs =
CompressedColumnarLongsSupplier.fromByteBuffer(
longsValueColumn,
byteOrder
);
return new ScalarLongColumnAndIndexSupplier(
longDictionarySupplier,
+ encodedCol,
longs,
rBitmaps,
bitmapSerdeFactory.getBitmapFactory(),
@@ -159,6 +173,7 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final Supplier<ColumnarLongs> valueColumnSupplier;
private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -170,6 +185,7 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
private ScalarLongColumnAndIndexSupplier(
Supplier<FixedIndexed<Long>> longDictionarySupplier,
+ Supplier<ColumnarInts> encodedValuesSupplier,
Supplier<ColumnarLongs> valueColumnSupplier,
GenericIndexed<ImmutableBitmap> valueIndexes,
BitmapFactory bitmapFactory,
@@ -177,6 +193,7 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
)
{
this.longDictionarySupplier = longDictionarySupplier;
+ this.encodedValuesSupplier = encodedValuesSupplier;
this.valueColumnSupplier = valueColumnSupplier;
this.valueIndexes = valueIndexes;
this.bitmapFactory = bitmapFactory;
@@ -189,8 +206,10 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
{
return new ScalarLongColumn(
longDictionarySupplier.get(),
+ encodedValuesSupplier,
valueColumnSupplier.get(),
- nullValueBitmap
+ nullValueBitmap,
+ bitmapFactory
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
index 7f1111c2e8b..ee7fe1475b7 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
@@ -117,6 +117,7 @@ public class ScalarStringColumnAndIndexSupplier implements
Supplier<NestedCommon
private final Supplier<ColumnarInts> encodedColumnSupplier;
private final GenericIndexed<ImmutableBitmap> valueIndexes;
private final ColumnIndexSupplier stringIndexSupplier;
+ private final BitmapSerdeFactory serdeFactory;
private ScalarStringColumnAndIndexSupplier(
Supplier<? extends Indexed<ByteBuffer>> dictionarySupplier,
@@ -128,6 +129,7 @@ public class ScalarStringColumnAndIndexSupplier implements
Supplier<NestedCommon
this.dictionarySupplier = dictionarySupplier;
this.encodedColumnSupplier = encodedColumnSupplier;
this.valueIndexes = valueIndexes;
+ this.serdeFactory = serdeFactory;
this.stringIndexSupplier = new StringUtf8ColumnIndexSupplier<>(
serdeFactory.getBitmapFactory(),
dictionarySupplier,
@@ -139,7 +141,12 @@ public class ScalarStringColumnAndIndexSupplier implements
Supplier<NestedCommon
@Override
public NestedCommonFormatColumn get()
{
- return new StringUtf8DictionaryEncodedColumn(encodedColumnSupplier.get(),
null, dictionarySupplier.get());
+ return new StringUtf8DictionaryEncodedColumn(
+ encodedColumnSupplier.get(),
+ null,
+ dictionarySupplier.get(),
+ serdeFactory.getBitmapFactory()
+ );
}
@Nullable
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
index dfd6307148d..8125fe53100 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
@@ -23,6 +23,7 @@ import com.google.common.primitives.Doubles;
import com.google.common.primitives.Floats;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.common.semantic.SemanticUtils;
@@ -96,6 +97,7 @@ public class VariantColumn<TStringDictionary extends
Indexed<ByteBuffer>>
private final ExpressionType logicalExpressionType;
@Nullable
private final FieldTypeInfo.TypeSet variantTypes;
+ private final BitmapFactory bitmapFactory;
private final int adjustLongId;
private final int adjustDoubleId;
private final int adjustArrayId;
@@ -108,7 +110,8 @@ public class VariantColumn<TStringDictionary extends
Indexed<ByteBuffer>>
ColumnarInts encodedValueColumn,
ImmutableBitmap nullValueBitmap,
ColumnType logicalType,
- @Nullable Byte variantTypeSetByte
+ @Nullable Byte variantTypeSetByte,
+ BitmapFactory bitmapFactory
)
{
this.stringDictionary = stringDictionary;
@@ -119,6 +122,7 @@ public class VariantColumn<TStringDictionary extends
Indexed<ByteBuffer>>
this.nullValueBitmap = nullValueBitmap;
this.logicalExpressionType =
ExpressionType.fromColumnTypeStrict(logicalType);
this.variantTypes = variantTypeSetByte == null ? null : new
FieldTypeInfo.TypeSet(variantTypeSetByte);
+ this.bitmapFactory = bitmapFactory;
// use the variant type bytes if set, in current code the logical type
should have been computed via this same means
// however older versions of the code had a bug which could incorrectly
classify mixed types as nested data
if (variantTypeSetByte != null) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index b93235b5bab..6a2ec476976 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -268,7 +268,8 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
encodedValueColumnSupplier.get(),
nullValueBitmap,
logicalType,
- variantTypeSetByte
+ variantTypeSetByte,
+ bitmapFactory
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 9adb8fac2b7..5b706b44b7f 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -20,12 +20,11 @@
package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +46,7 @@ import
org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@@ -55,6 +55,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
/**
* Serializer for a {@link NestedCommonFormatColumn} for single type arrays
and mixed type columns, but not columns
@@ -80,7 +81,6 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
private boolean dictionarySerialized = false;
private FixedIndexedIntWriter intermediateValueWriter;
- private ByteBuffer columnNameBytes = null;
private boolean hasNulls;
private boolean writeDictionary = true;
@Nullable
@@ -88,6 +88,8 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
@Nullable
private final Byte variantTypeSetByte;
+ private InternalSerializer internalSerializer = null;
+
public VariantColumnSerializer(
String name,
@Nullable ColumnType logicalType,
@@ -299,155 +301,246 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
closedForWrite = true;
+ internalSerializer = new InternalSerializer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ /**
+ * Internal serializer used to serailize a {@link VariantColumn}. Contains
the logic to write out the column to a
+ * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it
is closed for writes.
+ */
+ public static class InternalSerializer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public InternalSerializer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
+ for (boolean sorted : dictionariesSorted) {
+ if (writeDictionary && !sorted) {
+ throw DruidException.defensive(
+ "Dictionary is not sorted? [%s] Should always be sorted",
+ Arrays.toString(dictionariesSorted)
+ );
}
}
- rowCount++;
}
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- if (writeDictionary) {
- for (Int2ObjectMap.Entry<MutableBitmap> arrayElement :
arrayElements.int2ObjectEntrySet()) {
- arrayElementDictionaryWriter.write(arrayElement.getIntKey());
- arrayElementIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
- );
+ @Override
+ public long getSerializedSize()
+ {
+ long size = 1 + columnNameBytes.capacity();
+ // the value dictionaries, indexes, array element indexes and dictionary
id columns are all stored in separate files
+ if (variantTypeSetByte != null) {
+ size += 1;
}
+ return size;
}
- writeV0Header(channel, columnNameBytes);
- if (variantTypeSetByte != null) {
- channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
- }
-
- if (writeDictionary) {
- if (dictionaryIdLookup.getStringBufferMapper() != null) {
- copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getStringBufferMapper());
- } else {
- writeInternal(smoosher, dictionaryWriter,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
- }
- if (dictionaryIdLookup.getLongBufferMapper() != null) {
- copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
- } else {
- writeInternal(smoosher, longDictionaryWriter,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
- }
- if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
- copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getDoubleBufferMapper());
- } else {
- writeInternal(smoosher, doubleDictionaryWriter,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
+ {
+ writeV0Header(channel, columnNameBytes);
+ if (variantTypeSetByte != null) {
+ channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
}
- if (dictionaryIdLookup.getArrayBufferMapper() != null) {
- copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getArrayBufferMapper());
- } else {
- writeInternal(smoosher, arrayDictionaryWriter,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+
+ if (writeDictionary) {
+ if (dictionaryIdLookup.getStringBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getStringBufferMapper());
+ } else {
+ ColumnSerializerUtils.writeInternal(smoosher, dictionaryWriter,
columnName, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
+ }
+
+ if (dictionaryIdLookup.getLongBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getLongBufferMapper());
+ } else {
+ ColumnSerializerUtils.writeInternal(smoosher, longDictionaryWriter,
columnName, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+ }
+
+ if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getDoubleBufferMapper());
+ } else {
+ ColumnSerializerUtils.writeInternal(smoosher,
doubleDictionaryWriter, columnName,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+ }
+ if (dictionaryIdLookup.getArrayBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getArrayBufferMapper());
+ } else {
+ ColumnSerializerUtils.writeInternal(smoosher, arrayDictionaryWriter,
columnName, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+ }
+
+ ColumnSerializerUtils.writeInternal(smoosher,
arrayElementDictionaryWriter, columnName,
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
}
+ ColumnSerializerUtils.writeInternal(smoosher, encodedValueSerializer,
columnName, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
+ ColumnSerializerUtils.writeInternal(smoosher, bitmapIndexWriter,
columnName, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
+ ColumnSerializerUtils.writeInternal(smoosher, arrayElementIndexWriter,
columnName, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
- writeInternal(smoosher, arrayElementDictionaryWriter,
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
+ log.info("Column [%s] serialized successfully.", columnName);
}
- writeInternal(smoosher, encodedValueSerializer,
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
- writeInternal(smoosher, bitmapIndexWriter,
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
- writeInternal(smoosher, arrayElementIndexWriter,
ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
-
- log.info("Column [%s] serialized successfully.", name);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
index cf9b7c70d56..02e7f5b4d39 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
@@ -344,7 +344,8 @@ public class DictionaryEncodedColumnPartSerde implements
ColumnPartSerde
final StringUtf8DictionaryEncodedColumnSupplier<?> supplier = new
StringUtf8DictionaryEncodedColumnSupplier<>(
dictionarySupplier,
rSingleValuedColumn,
- rMultiValuedColumn
+ rMultiValuedColumn,
+ bitmapSerdeFactory.getBitmapFactory()
);
builder.setHasMultipleValues(hasMultipleValues)
.setHasNulls(hasNulls)
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
index 33de1869e27..6d443ebf11d 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.serde;
import com.google.common.base.Supplier;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
@@ -38,16 +39,19 @@ public class
StringUtf8DictionaryEncodedColumnSupplier<TIndexed extends Indexed<
private final Supplier<TIndexed> utf8Dictionary;
private final @Nullable Supplier<ColumnarInts> singleValuedColumn;
private final @Nullable Supplier<ColumnarMultiInts> multiValuedColumn;
+ private final BitmapFactory bitmapFactory;
public StringUtf8DictionaryEncodedColumnSupplier(
Supplier<TIndexed> utf8Dictionary,
@Nullable Supplier<ColumnarInts> singleValuedColumn,
- @Nullable Supplier<ColumnarMultiInts> multiValuedColumn
+ @Nullable Supplier<ColumnarMultiInts> multiValuedColumn,
+ BitmapFactory bitmapFactory
)
{
this.utf8Dictionary = utf8Dictionary;
this.singleValuedColumn = singleValuedColumn;
this.multiValuedColumn = multiValuedColumn;
+ this.bitmapFactory = bitmapFactory;
}
public Supplier<TIndexed> getDictionary()
@@ -64,19 +68,22 @@ public class
StringUtf8DictionaryEncodedColumnSupplier<TIndexed extends Indexed<
return new StringUtf8DictionaryEncodedColumn(
singleValuedColumn != null ? new
CombineFirstTwoValuesColumnarInts(singleValuedColumn.get()) : null,
multiValuedColumn != null ? new
CombineFirstTwoValuesColumnarMultiInts(multiValuedColumn.get()) : null,
- CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary)
+ CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary),
+ bitmapFactory
);
} else if
(NullHandling.mustReplaceFirstValueWithNullInDictionary(suppliedUtf8Dictionary))
{
return new StringUtf8DictionaryEncodedColumn(
singleValuedColumn != null ? singleValuedColumn.get() : null,
multiValuedColumn != null ? multiValuedColumn.get() : null,
- new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary)
+ new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary),
+ bitmapFactory
);
} else {
return new StringUtf8DictionaryEncodedColumn(
singleValuedColumn != null ? singleValuedColumn.get() : null,
multiValuedColumn != null ? multiValuedColumn.get() : null,
- suppliedUtf8Dictionary
+ suppliedUtf8Dictionary,
+ bitmapFactory
);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
index 3931601dd4f..0825e0e46bd 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
@@ -30,10 +30,10 @@ import java.io.IOException;
*/
public interface IOIterator<T> extends Closeable
{
- boolean hasNext() throws IOException;
+ boolean hasNext();
T next() throws IOException;
@Override
- void close() throws IOException;
+ void close();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
index ffbf00a9c40..476795b9f5e 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
@@ -64,6 +64,41 @@ import java.nio.ByteOrder;
*/
public interface StagedSerde<T>
{
+ static StagedSerde<byte[]> forBytes()
+ {
+ return new StagedSerde<byte[]>()
+ {
+ @Override
+ public byte[] serialize(byte[] value)
+ {
+ return value;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] bytes)
+ {
+ return bytes;
+ }
+
+ @Override
+ public StorableBuffer serializeDelayed(@Nullable byte[] value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public byte[] deserialize(ByteBuffer byteBuffer)
+ {
+ byte[] retVal = new byte[byteBuffer.remaining()];
+ int position = byteBuffer.position();
+ byteBuffer.get(retVal);
+ byteBuffer.position(position);
+ return retVal;
+ }
+ };
+ }
+
/**
* Useful method when some computation is necessary to prepare for
serialization without actually writing out
* all the bytes in order to determine the serialized size. It allows
encapsulation of the size computation and
diff --git
a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
index ff4dc11f220..0a79b97f523 100644
---
a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.filter.SelectorPredicateFactory;
@@ -76,7 +77,8 @@ public class PredicateValueMatcherFactoryTest extends
InitializedNullHandlingTes
GenericIndexed.UTF8_STRATEGY
)::singleThreaded,
null,
- () ->
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
int[]{1})))
+ () ->
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
int[]{1}))),
+ new RoaringBitmapFactory()
);
final ValueMatcher matcher = forSelector("v2")
.makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new
SimpleAscendingOffset(1), null), true);
@@ -97,7 +99,8 @@ public class PredicateValueMatcherFactoryTest extends
InitializedNullHandlingTes
GenericIndexed.UTF8_STRATEGY
)::singleThreaded,
null,
- () ->
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
int[]{1})))
+ () ->
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
int[]{1}))),
+ new RoaringBitmapFactory()
);
final ValueMatcher matcher = forSelector("v3")
.makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new
SimpleAscendingOffset(1), null), true);
diff --git
a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
index 86e4e0ee08b..bdb15ee445a 100644
---
a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.filter;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.DruidPredicateMatch;
@@ -47,13 +48,15 @@ public class ValueMatchersTest extends
InitializedNullHandlingTest
@Before
public void setup()
{
+ final RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory();
supplierSingleConstant = new StringUtf8DictionaryEncodedColumnSupplier<>(
GenericIndexed.fromIterable(
ImmutableList.of(ByteBuffer.wrap(StringUtils.toUtf8("value"))),
GenericIndexed.UTF8_STRATEGY
)::singleThreaded,
() -> VSizeColumnarInts.fromArray(new int[]{0}),
- null
+ null,
+ bitmapFactory
);
supplierSingle = new StringUtf8DictionaryEncodedColumnSupplier<>(
GenericIndexed.fromIterable(
@@ -64,7 +67,8 @@ public class ValueMatchersTest extends
InitializedNullHandlingTest
GenericIndexed.UTF8_STRATEGY
)::singleThreaded,
() -> VSizeColumnarInts.fromArray(new int[]{0, 0, 1, 0, 1}),
- null
+ null,
+ bitmapFactory
);
supplierMulti = new StringUtf8DictionaryEncodedColumnSupplier<>(
GenericIndexed.fromIterable(
@@ -77,7 +81,8 @@ public class ValueMatchersTest extends
InitializedNullHandlingTest
VSizeColumnarInts.fromArray(new int[]{0, 0}),
VSizeColumnarInts.fromArray(new int[]{0})
)
- )
+ ),
+ bitmapFactory
);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]