github-code-scanning[bot] commented on code in PR #14919:
URL: https://github.com/apache/druid/pull/14919#discussion_r1308108400
##########
processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java:
##########
@@ -19,114 +19,308 @@
package org.apache.druid.segment.nested;
-import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.doubles.Double2IntLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.doubles.Double2IntMap;
-import it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntAVLTreeMap;
-import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
+import com.google.common.primitives.Ints;
+import org.apache.druid.annotations.SuppressFBWarnings;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import org.apache.druid.segment.column.StringEncodingStrategies;
+import org.apache.druid.segment.column.TypeStrategies;
+import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.FixedIndexed;
+import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
+import org.apache.druid.segment.data.Indexed;
import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.EnumSet;
/**
- * Ingestion time dictionary identifier lookup, used by {@link
NestedCommonFormatColumnSerializer} to build a global
- * dictionary id to value mapping for the 'stacked' global value dictionaries.
+ * Value to dictionary id lookup, backed with memory mapped dictionaries
populated lazily by the supplied
+ * @link DictionaryWriter}.
*/
-public class DictionaryIdLookup
+public final class DictionaryIdLookup implements Closeable
{
- private final Object2IntMap<String> stringLookup;
+ private final String name;
+ @Nullable
+ private final DictionaryWriter<String> stringDictionaryWriter;
+ private SmooshedFileMapper stringBufferMapper = null;
+ private Indexed<ByteBuffer> stringDictionary = null;
- private final Long2IntMap longLookup;
+ @Nullable
+ private final DictionaryWriter<Long> longDictionaryWriter;
+ private MappedByteBuffer longBuffer = null;
+ private FixedIndexed<Long> longDictionary = null;
- private final Double2IntMap doubleLookup;
+ @Nullable
+ private final DictionaryWriter<Double> doubleDictionaryWriter;
+ MappedByteBuffer doubleBuffer = null;
+ FixedIndexed<Double> doubleDictionary = null;
- private final Object2IntMap<int[]> arrayLookup;
+ @Nullable
+ private final DictionaryWriter<int[]> arrayDictionaryWriter;
+ private MappedByteBuffer arrayBuffer = null;
+ private FrontCodedIntArrayIndexed arrayDictionary = null;
- private int dictionarySize;
+ public DictionaryIdLookup(
+ String name,
+ @Nullable DictionaryWriter<String> stringDictionaryWriter,
+ @Nullable DictionaryWriter<Long> longDictionaryWriter,
+ @Nullable DictionaryWriter<Double> doubleDictionaryWriter,
+ @Nullable DictionaryWriter<int[]> arrayDictionaryWriter
+ )
+ {
+ this.name = name;
+ this.stringDictionaryWriter = stringDictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ }
- public DictionaryIdLookup()
+ public int lookupString(@Nullable String value)
{
- this.stringLookup = new Object2IntLinkedOpenHashMap<>();
- stringLookup.defaultReturnValue(-1);
- this.longLookup = new Long2IntLinkedOpenHashMap();
- longLookup.defaultReturnValue(-1);
- this.doubleLookup = new Double2IntLinkedOpenHashMap();
- doubleLookup.defaultReturnValue(-1);
- this.arrayLookup = new
Object2IntAVLTreeMap<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
- this.arrayLookup.defaultReturnValue(-1);
+ if (stringDictionary == null) {
+ // GenericIndexed v2 can write to multiple files if the dictionary is
larger than 2gb, so we use a smooshfile
+ // for strings because of this. if other type dictionary writers could
potentially use multiple internal files
+ // in the future, we should transition them to using this approach as
well (or build a combination smoosher and
+ // mapper so that we can have a mutable smoosh)
+ File stringSmoosh = FileUtils.createTempDir(name + "__stringTempSmoosh");
+ final String fileName =
NestedCommonFormatColumnSerializer.getInternalFileName(
+ name,
+ NestedCommonFormatColumnSerializer.STRING_DICTIONARY_FILE_NAME
+ );
+ final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
+ try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
+ fileName,
+ stringDictionaryWriter.getSerializedSize()
+ )) {
+ stringDictionaryWriter.writeTo(writer, smoosher);
+ writer.close();
+ smoosher.close();
+ stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
+ final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
+ stringDictionary =
StringEncodingStrategies.getStringDictionarySupplier(
+ stringBufferMapper,
+ stringBuffer,
+ ByteOrder.nativeOrder()
+ ).get();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ final byte[] bytes = StringUtils.toUtf8Nullable(value);
+ final int index = stringDictionary.indexOf(bytes == null ? null :
ByteBuffer.wrap(bytes));
+ if (index < 0) {
+ throw DruidException.defensive("Value not found in string dictionary");
+ }
+ return index;
}
- public void addString(@Nullable String value)
+ public int lookupLong(@Nullable Long value)
{
- Preconditions.checkState(
- longLookup.size() == 0 && doubleLookup.size() == 0,
- "All string values must be inserted to the lookup before long and
double types"
- );
- int id = dictionarySize++;
- stringLookup.put(value, id);
+ if (longDictionary == null) {
+ Path longFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.LONG_DICTIONARY_FILE_NAME);
+ longBuffer = mapWriter(longFile, longDictionaryWriter);
+ longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG,
ByteOrder.nativeOrder(), Long.BYTES).get();
+ // reset position
+ longBuffer.position(0);
+ }
+ final int index = longDictionary.indexOf(value);
+ if (index < 0) {
+ throw DruidException.defensive("Value not found in long dictionary");
+ }
+ return index + longOffset();
}
- // used when there are no string values to ensure that 0 is used for the
null value
- public void addNumericNull()
+ public int lookupDouble(@Nullable Double value)
{
- Preconditions.checkState(
- stringLookup.size() == 0 && longLookup.size() == 0 &&
doubleLookup.size() == 0,
- "Lookup must be empty to add implicit null"
- );
- dictionarySize++;
+ if (doubleDictionary == null) {
+ Path doubleFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME);
+ doubleBuffer = mapWriter(doubleFile, doubleDictionaryWriter);
+ doubleDictionary = FixedIndexed.read(doubleBuffer,
TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES).get();
+ // reset position
+ doubleBuffer.position(0);
+ }
+ final int index = doubleDictionary.indexOf(value);
+ if (index < 0) {
+ throw DruidException.defensive("Value not found in double dictionary");
+ }
+ return index + doubleOffset();
}
- public int lookupString(@Nullable String value)
+ public int lookupArray(@Nullable int[] value)
{
- return stringLookup.getInt(value);
+ if (arrayDictionary == null) {
+ Path arrayFile = makeTempFile(name +
NestedCommonFormatColumnSerializer.ARRAY_DICTIONARY_FILE_NAME);
+ arrayBuffer = mapWriter(arrayFile, arrayDictionaryWriter);
+ arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer,
ByteOrder.nativeOrder()).get();
+ // reset position
+ arrayBuffer.position(0);
+ }
+ final int index = arrayDictionary.indexOf(value);
+ if (index < 0) {
+ throw DruidException.defensive("Value not found in array dictionary");
+ }
+ return index + arrayOffset();
}
- public void addLong(long value)
+ @Nullable
+ public SmooshedFileMapper getStringBufferMapper()
{
- Preconditions.checkState(
- doubleLookup.size() == 0,
- "All long values must be inserted to the lookup before double types"
- );
- int id = dictionarySize++;
- longLookup.put(value, id);
+ return stringBufferMapper;
}
- public int lookupLong(@Nullable Long value)
+ @Nullable
+ public ByteBuffer getLongBuffer()
{
- if (value == null) {
- return 0;
- }
- return longLookup.get(value.longValue());
+ return longBuffer;
}
- public void addDouble(double value)
+ @Nullable
+ public ByteBuffer getDoubleBuffer()
{
- int id = dictionarySize++;
- doubleLookup.put(value, id);
+ return doubleBuffer;
}
- public int lookupDouble(@Nullable Double value)
+ @Nullable
+ public ByteBuffer getArrayBuffer()
{
- if (value == null) {
- return 0;
+ return arrayBuffer;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (stringBufferMapper != null) {
+ stringBufferMapper.close();
+ }
+ if (longBuffer != null) {
+ ByteBufferUtils.unmap(longBuffer);
+ }
+ if (doubleBuffer != null) {
+ ByteBufferUtils.unmap(doubleBuffer);
+ }
+ if (arrayBuffer != null) {
+ ByteBufferUtils.unmap(arrayBuffer);
}
- return doubleLookup.get(value.doubleValue());
}
- public void addArray(int[] value)
+ private int longOffset()
{
- int id = dictionarySize++;
- arrayLookup.put(value, id);
+ return stringDictionaryWriter != null ?
stringDictionaryWriter.getCardinality() : 0;
}
- public int lookupArray(@Nullable int[] value)
+ private int doubleOffset()
+ {
+ return longOffset() + (longDictionaryWriter != null ?
longDictionaryWriter.getCardinality() : 0);
+ }
+
+ private int arrayOffset()
+ {
+ return doubleOffset() + (doubleDictionaryWriter != null ?
doubleDictionaryWriter.getCardinality() : 0);
+ }
+
+ private Path makeTempFile(String name)
{
- if (value == null) {
- return 0;
+ try {
+ return Files.createTempFile(name, ".tmp");
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
+ private MappedByteBuffer mapWriter(Path path, DictionaryWriter<?> writer)
+ {
+ final EnumSet<StandardOpenOption> options = EnumSet.of(
+ StandardOpenOption.READ,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING
+ );
+
+ try (FileChannel fileChannel = FileChannel.open(path, options);
+ GatheringByteChannel smooshChannel = makeWriter(fileChannel,
writer.getSerializedSize())) {
+ //noinspection DataFlowIssue
+ writer.writeTo(smooshChannel, null);
+ return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0,
writer.getSerializedSize());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
}
- return arrayLookup.getInt(value);
+ }
+
+ private GatheringByteChannel makeWriter(FileChannel channel, long size)
throws IOException
+ {
+ // basically same code as smooshed writer, can't use channel directly
because copying between channels
+ // doesn't handle size of source channel correctly
+ return new GatheringByteChannel()
+ {
+ private boolean isClosed = false;
+ private int currOffset = 0;
+
+ @Override
+ public boolean isOpen()
+ {
+ return !isClosed;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ channel.close();
+ isClosed = true;
+ }
+
+ public int bytesLeft()
+ {
+ return (int) (size - currOffset);
+ }
+
+ @Override
+ public int write(ByteBuffer buffer) throws IOException
+ {
+ return addToOffset(channel.write(buffer));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
+ {
+ return addToOffset(channel.write(srcs, offset, length));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException
+ {
+ return addToOffset(channel.write(srcs));
+ }
+
+ public int addToOffset(long numBytesWritten)
+ {
+ if (numBytesWritten > bytesLeft()) {
+ throw new ISE("Wrote more bytes[%,d] than available[%,d]. Don't do
that.", numBytesWritten, bytesLeft());
+ }
+ currOffset += numBytesWritten;
Review Comment:
## Implicit narrowing conversion in compound assignment
Implicit cast of source type long to narrower destination type int.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5736)
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -378,10 +382,30 @@
channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
}
- writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, doubleDictionaryWriter,
DOUBLE_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME);
+ if (dictionaryIdLookup.getStringBufferMapper() != null) {
+ SmooshedFileMapper fileMapper =
dictionaryIdLookup.getStringBufferMapper();
+ for (String name : fileMapper.getInternalFilenames()) {
Review Comment:
## Possible confusion of local and field
Confusing name: method [writeTo](1) also refers to field [name](2) (without
qualifying it with 'this').
[Show more
details](https://github.com/apache/druid/security/code-scanning/5739)
##########
processing/src/test/java/org/apache/druid/segment/data/FixedIndexedTest.java:
##########
@@ -141,6 +141,35 @@
for (Long aLong : LONGS) {
writer.write(aLong);
}
+ Iterator<Long> longIterator = writer.getIterator();
+ int ctr = 0;
+ int totalCount = withNull ? 1 + LONGS.length : LONGS.length;
+ for (int i = 0; i < totalCount; i++) {
+ if (withNull) {
+ if (i == 0) {
+ Assert.assertNull(writer.get(i));
+ } else {
+ Assert.assertEquals(" index: " + i, LONGS[i - 1], writer.get(i));
+ }
+ } else {
+ Assert.assertEquals(" index: " + i, LONGS[i], writer.get(i));
Review Comment:
## Array index out of bounds
This array access might be out of bounds, as the index might be equal to the
array length.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5737)
##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -433,10 +437,31 @@
writeV0Header(channel, columnNameBytes);
fieldsWriter.writeTo(channel, smoosher);
fieldsInfoWriter.writeTo(channel, smoosher);
- writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, doubleDictionaryWriter,
DOUBLE_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, arrayDictionaryWriter, ARRAY_DICTIONARY_FILE_NAME);
+
+
+ if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
+ SmooshedFileMapper fileMapper =
globalDictionaryIdLookup.getStringBufferMapper();
+ for (String name : fileMapper.getInternalFilenames()) {
Review Comment:
## Possible confusion of local and field
Confusing name: method [writeTo](1) also refers to field [name](2) (without
qualifying it with 'this').
[Show more
details](https://github.com/apache/druid/security/code-scanning/5738)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]