This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b645d09c5dc move long and double nested field serialization to later
phase of serialization (#16769)
b645d09c5dc is described below
commit b645d09c5dc5791e538532efbd44e203e005a9fb
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Jul 22 21:14:30 2024 -0700
move long and double nested field serialization to later phase of
serialization (#16769)
changes:
* moves value column serializer initialization, call to `writeValue` method
to `GlobalDictionaryEncodedFieldColumnWriter.writeTo` instead of during
`GlobalDictionaryEncodedFieldColumnWriter.addValue`. This shift means these
numeric value columns are now done in the per field section that happens after
serializing the nested column raw data, so only a single compression buffer and
temp file will be needed at a time instead of the total number of nested
literal fields present in the col [...]
---
.../druid/segment/nested/DictionaryIdLookup.java | 158 +++++++++++++--------
.../GlobalDictionaryEncodedFieldColumnWriter.java | 10 +-
.../nested/ScalarDoubleFieldColumnWriter.java | 26 ++--
.../nested/ScalarLongFieldColumnWriter.java | 26 ++--
4 files changed, 128 insertions(+), 92 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
index f4176db220c..6827497f7a6 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
@@ -99,42 +99,27 @@ public final class DictionaryIdLookup implements Closeable
this.arrayDictionaryWriter = arrayDictionaryWriter;
}
- public int lookupString(@Nullable String value)
+ @Nullable
+ public Object getDictionaryValue(int id)
{
- if (stringDictionary == null) {
- // GenericIndexed v2 can write to multiple files if the dictionary is
larger than 2gb, so we use a smooshfile
- // for strings because of this. if other type dictionary writers could
potentially use multiple internal files
- // in the future, we should transition them to using this approach as
well (or build a combination smoosher and
- // mapper so that we can have a mutable smoosh)
- File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath,
StringUtils.urlEncode(name) + "__stringTempSmoosh");
- stringDictionaryFile = stringSmoosh.toPath();
- final String fileName =
NestedCommonFormatColumnSerializer.getInternalFileName(
- name,
- NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
- );
-
- try (
- final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
- final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
- fileName,
- stringDictionaryWriter.getSerializedSize()
- )
- ) {
- stringDictionaryWriter.writeTo(writer, smoosher);
- writer.close();
- smoosher.close();
- stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
- final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
- stringDictionary =
StringEncodingStrategies.getStringDictionarySupplier(
- stringBufferMapper,
- stringBuffer,
- ByteOrder.nativeOrder()
- ).get();
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ ensureStringDictionaryLoaded();
+ ensureLongDictionaryLoaded();
+ ensureDoubleDictionaryLoaded();
+ ensureArrayDictionaryLoaded();
+ if (id < longOffset()) {
+ return StringUtils.fromUtf8Nullable(stringDictionary.get(id));
+ } else if (id < doubleOffset()) {
+ return longDictionary.get(id - longOffset());
+ } else if (id < arrayOffset()) {
+ return doubleDictionary.get(id - doubleOffset());
+ } else {
+ return arrayDictionary.get(id - arrayOffset());
}
+ }
+
+ public int lookupString(@Nullable String value)
+ {
+ ensureStringDictionaryLoaded();
final byte[] bytes = StringUtils.toUtf8Nullable(value);
final int index = stringDictionary.indexOf(bytes == null ? null :
ByteBuffer.wrap(bytes));
if (index < 0) {
@@ -145,13 +130,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupLong(@Nullable Long value)
{
- if (longDictionary == null) {
- longDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
- longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
- longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG,
ByteOrder.nativeOrder(), Long.BYTES).get();
- // reset position
- longBuffer.position(0);
- }
+ ensureLongDictionaryLoaded();
final int index = longDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] long
dictionary", name);
@@ -161,18 +140,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupDouble(@Nullable Double value)
{
- if (doubleDictionary == null) {
- doubleDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
- doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
- doubleDictionary = FixedIndexed.read(
- doubleBuffer,
- TypeStrategies.DOUBLE,
- ByteOrder.nativeOrder(),
- Double.BYTES
- ).get();
- // reset position
- doubleBuffer.position(0);
- }
+ ensureDoubleDictionaryLoaded();
final int index = doubleDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] double
dictionary", name);
@@ -182,13 +150,7 @@ public final class DictionaryIdLookup implements Closeable
public int lookupArray(@Nullable int[] value)
{
- if (arrayDictionary == null) {
- arrayDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
- arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
- arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer,
ByteOrder.nativeOrder()).get();
- // reset position
- arrayBuffer.position(0);
- }
+ ensureArrayDictionaryLoaded();
final int index = arrayDictionary.indexOf(value);
if (index < 0) {
throw DruidException.defensive("Value not found in column[%s] array
dictionary", name);
@@ -256,6 +218,82 @@ public final class DictionaryIdLookup implements Closeable
return doubleOffset() + (doubleDictionaryWriter != null ?
doubleDictionaryWriter.getCardinality() : 0);
}
+ private void ensureStringDictionaryLoaded()
+ {
+ if (stringDictionary == null) {
+ // GenericIndexed v2 can write to multiple files if the dictionary is
larger than 2gb, so we use a smooshfile
+ // for strings because of this. if other type dictionary writers could
potentially use multiple internal files
+ // in the future, we should transition them to using this approach as
well (or build a combination smoosher and
+ // mapper so that we can have a mutable smoosh)
+ File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath,
StringUtils.urlEncode(name) + "__stringTempSmoosh");
+ stringDictionaryFile = stringSmoosh.toPath();
+ final String fileName =
NestedCommonFormatColumnSerializer.getInternalFileName(
+ name,
+ NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
+ );
+
+ try (
+ final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
+ final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
+ fileName,
+ stringDictionaryWriter.getSerializedSize()
+ )
+ ) {
+ stringDictionaryWriter.writeTo(writer, smoosher);
+ writer.close();
+ smoosher.close();
+ stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
+ final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
+ stringDictionary =
StringEncodingStrategies.getStringDictionarySupplier(
+ stringBufferMapper,
+ stringBuffer,
+ ByteOrder.nativeOrder()
+ ).get();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void ensureLongDictionaryLoaded()
+ {
+ if (longDictionary == null) {
+ longDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
+ longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
+ longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG,
ByteOrder.nativeOrder(), Long.BYTES).get();
+ // reset position
+ longBuffer.position(0);
+ }
+ }
+
+ private void ensureDoubleDictionaryLoaded()
+ {
+ if (doubleDictionary == null) {
+ doubleDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
+ doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
+ doubleDictionary = FixedIndexed.read(
+ doubleBuffer,
+ TypeStrategies.DOUBLE,
+ ByteOrder.nativeOrder(),
+ Double.BYTES
+ ).get();
+ // reset position
+ doubleBuffer.position(0);
+ }
+ }
+
+ private void ensureArrayDictionaryLoaded()
+ {
+ if (arrayDictionary == null && arrayDictionaryWriter != null) {
+ arrayDictionaryFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
+ arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
+ arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer,
ByteOrder.nativeOrder()).get();
+ // reset position
+ arrayBuffer.position(0);
+ }
+ }
+
private Path makeTempFile(String name)
{
try {
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index aa6a71ae754..d9f00bb2321 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -117,8 +117,8 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
}
/**
- * Hook to allow implementors the chance to do additional operations during
{@link #addValue(int, Object)}, such as
- * writing an additional value column
+ * Hook to allow implementors the chance to do additional operations during
{@link #writeTo(int, FileSmoosher)}, such
+ * as writing an additional value column
*/
void writeValue(@Nullable T value) throws IOException
{
@@ -159,7 +159,6 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
localId = localDictionary.add(globalId);
}
intermediateValueWriter.write(localId);
- writeValue(value);
cursorPosition++;
}
@@ -168,11 +167,9 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
*/
private void fillNull(int row) throws IOException
{
- final T value = processValue(row, null);
final int localId = localDictionary.add(0);
while (cursorPosition < row) {
intermediateValueWriter.write(localId);
- writeValue(value);
cursorPosition++;
}
}
@@ -252,6 +249,7 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
final int unsortedLocalId = rows.nextInt();
final int sortedLocalId = unsortedToSorted[unsortedLocalId];
encodedValueSerializer.addValue(sortedLocalId);
+ writeValue((T)
globalDictionaryIdLookup.getDictionaryValue(unsortedToGlobal[unsortedLocalId]));
bitmaps[sortedLocalId].add(rowCount++);
}
@@ -307,7 +305,7 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
}
}
- private void openColumnSerializer(SegmentWriteOutMedium medium, int maxId)
throws IOException
+ public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId)
throws IOException
{
if (indexSpec.getDimensionCompression() !=
CompressionStrategy.UNCOMPRESSED) {
this.version = DictionaryEncodedColumnPartSerde.VERSION.COMPRESSED;
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
index 8ccd528715b..09e8dc121c8 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
@@ -59,12 +59,22 @@ public final class ScalarDoubleFieldColumnWriter extends
GlobalDictionaryEncoded
}
@Override
- public void open() throws IOException
+ void writeValue(@Nullable Double value) throws IOException
+ {
+ if (value == null) {
+ doublesSerializer.add(0.0);
+ } else {
+ doublesSerializer.add(value);
+ }
+ }
+
+ @Override
+ public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId)
throws IOException
{
- super.open();
+ super.openColumnSerializer(medium, maxId);
doublesSerializer = CompressionFactory.getDoubleSerializer(
fieldName,
- segmentWriteOutMedium,
+ medium,
StringUtils.format("%s.double_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getDimensionCompression(),
@@ -73,16 +83,6 @@ public final class ScalarDoubleFieldColumnWriter extends
GlobalDictionaryEncoded
doublesSerializer.open();
}
- @Override
- void writeValue(@Nullable Double value) throws IOException
- {
- if (value == null) {
- doublesSerializer.add(0.0);
- } else {
- doublesSerializer.add(value);
- }
- }
-
@Override
void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
index 66b5eca18d9..d9191c4e805 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
@@ -59,12 +59,22 @@ public final class ScalarLongFieldColumnWriter extends
GlobalDictionaryEncodedFi
}
@Override
- public void open() throws IOException
+ void writeValue(@Nullable Long value) throws IOException
+ {
+ if (value == null) {
+ longsSerializer.add(0L);
+ } else {
+ longsSerializer.add(value);
+ }
+ }
+
+ @Override
+ public void openColumnSerializer(SegmentWriteOutMedium medium, int maxId)
throws IOException
{
- super.open();
+ super.openColumnSerializer(medium, maxId);
longsSerializer = CompressionFactory.getLongSerializer(
fieldName,
- segmentWriteOutMedium,
+ medium,
StringUtils.format("%s.long_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
@@ -74,16 +84,6 @@ public final class ScalarLongFieldColumnWriter extends
GlobalDictionaryEncodedFi
longsSerializer.open();
}
- @Override
- void writeValue(@Nullable Long value) throws IOException
- {
- if (value == null) {
- longsSerializer.add(0L);
- } else {
- longsSerializer.add(value);
- }
- }
-
@Override
void writeColumnTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]