This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 31.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.1 by this push:
new c9aac1bb405 projection segment merge fixes (#17460) (#17503)
c9aac1bb405 is described below
commit c9aac1bb4058acd335fde36e55d3a9985f329b94
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Nov 22 14:58:35 2024 -0800
projection segment merge fixes (#17460) (#17503)
changes:
* fix issue when merging projections from multiple-incremental persists
which was hoping that some 'dim conversion' buffers were not closed, but they
already were (by the merging iterator). fix involves selectively persisting
these conversion buffers to temp files in the segment write out directory and
mapping them and tying them to the segment level closer so that they are
available after the lifetime of the parent merger
* modify auto column serializers to use segment write out directory for
temp files instead of java.io.tmpdir
* fix queryable index projection to not put the time-like column as a
dimension, instead only adding it as __time
* use smoosh for temp files so can safely write any Serializer to a temp
smoosh
---
.../apache/druid/common/utils/SerializerUtils.java | 1 -
.../apache/druid/segment/AutoTypeColumnMerger.java | 7 +-
.../segment/DictionaryEncodedColumnMerger.java | 163 ++++++++++++-
.../org/apache/druid/segment/DimensionHandler.java | 44 +++-
.../apache/druid/segment/DimensionMergerV9.java | 19 +-
.../druid/segment/DoubleDimensionHandler.java | 2 +
.../druid/segment/FloatDimensionHandler.java | 2 +
.../java/org/apache/druid/segment/IndexIO.java | 1 +
.../org/apache/druid/segment/IndexMergerV9.java | 15 ++
.../apache/druid/segment/LongDimensionHandler.java | 2 +
.../segment/NestedCommonFormatColumnHandler.java | 4 +-
.../druid/segment/NestedDataColumnHandlerV4.java | 2 +
.../apache/druid/segment/SimpleQueryableIndex.java | 22 +-
.../druid/segment/StringDimensionHandler.java | 3 +
.../druid/segment/StringDimensionMergerV9.java | 4 +-
.../druid/segment/nested/DictionaryIdLookup.java | 256 ++++++---------------
.../nested/NestedCommonFormatColumnSerializer.java | 10 +-
.../segment/nested/NestedDataColumnSerializer.java | 36 +--
.../nested/NestedDataColumnSerializerV4.java | 2 +-
.../nested/ScalarDoubleColumnSerializer.java | 10 +-
.../segment/nested/ScalarLongColumnSerializer.java | 10 +-
.../nested/ScalarStringColumnSerializer.java | 6 +-
.../segment/nested/VariantColumnSerializer.java | 28 +--
.../druid/segment/serde/ColumnSerializerUtils.java | 33 ++-
.../org/apache/druid/segment/serde/Serializer.java | 7 +-
.../apache/druid/segment/IndexMergerTestBase.java | 235 +++++++++++++++++++
.../segment/nested/DictionaryIdLookupTest.java | 7 +-
.../nested/NestedDataColumnSupplierTest.java | 2 +-
.../nested/ScalarDoubleColumnSupplierTest.java | 2 +-
.../nested/ScalarLongColumnSupplierTest.java | 2 +-
.../nested/ScalarStringColumnSupplierTest.java | 2 +-
.../segment/nested/VariantColumnSupplierTest.java | 2 +-
32 files changed, 667 insertions(+), 274 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
b/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
index 73b9d1cd447..78663d49080 100644
---
a/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
+++
b/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
@@ -33,7 +33,6 @@ import java.nio.channels.WritableByteChannel;
public class SerializerUtils
{
-
public <T extends OutputStream> void writeString(T out, String name) throws
IOException
{
byte[] nameBytes = StringUtils.toUtf8(name);
diff --git
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index c2bba50b3d8..0f8ea4df145 100644
---
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -46,6 +46,7 @@ import
org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
@@ -87,6 +88,8 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
private boolean isVariantType = false;
private byte variantTypeByte = 0x00;
+ private final File segmentBaseDir;
+
/**
* @param name column name
* @param outputName output smoosh file name. if this is a base
table column, it will be the equivalent to
@@ -105,6 +108,7 @@ public class AutoTypeColumnMerger implements
DimensionMergerV9
@Nullable ColumnType castToType,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
+ File segmentBaseDir,
Closer closer
)
{
@@ -114,6 +118,7 @@ public class AutoTypeColumnMerger implements
DimensionMergerV9
this.castToType = castToType;
this.indexSpec = indexSpec;
this.segmentWriteOutMedium = segmentWriteOutMedium;
+ this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}
@@ -265,7 +270,7 @@ public class AutoTypeColumnMerger implements
DimensionMergerV9
);
}
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(segmentBaseDir);
serializer.serializeFields(mergedFields);
int stringCardinality;
diff --git
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index a498e081b2c..6859715cf38 100644
---
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
+++
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -26,10 +26,13 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
@@ -51,13 +54,21 @@ import
org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
+import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.IntBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@@ -101,6 +112,16 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
@Nullable
protected T firstDictionaryValue;
+ protected File segmentBaseDir;
+
+ /**
+ * This becomes non-null if {@link #markAsParent()} is called indicating
that this column is a base table 'parent'
+ * to some projection column, which requires persisting id conversion
buffers to a temporary files. If there are no
+ * projections defined (or projections which reference this column) then id
conversion buffers will be freed after
+ * calling {@link #writeIndexes(List)}
+ */
+ @MonotonicNonNull
+ protected PersistedIdConversions persistedIdConversions;
public DictionaryEncodedColumnMerger(
String dimensionName,
@@ -109,6 +130,7 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
@@ -118,8 +140,8 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
this.capabilities = capabilities;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.nullRowsBitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-
this.progress = progress;
+ this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}
@@ -129,6 +151,19 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
@Nullable
protected abstract T coerceValue(T value);
+ @Override
+ public void markAsParent()
+ {
+ final File tmpOutputFilesDir = new File(segmentBaseDir, "tmp_" +
outputName + "_merger");
+ try {
+ FileUtils.mkdirp(tmpOutputFilesDir);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ persistedIdConversions = closer.register(new
PersistedIdConversions(tmpOutputFilesDir));
+ }
+
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters)
throws IOException
{
@@ -192,7 +227,18 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
writeDictionary(() -> dictionaryMergeIterator);
for (int i = 0; i < adapters.size(); i++) {
if (dimValueLookups[i] != null &&
dictionaryMergeIterator.needConversion(i)) {
- dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
+ final IntBuffer conversionBuffer;
+ if (persistedIdConversions != null) {
+ // if we are a projection parent column, persist the id mapping
buffer so that child mergers have access
+ // to the mappings during serialization to adjust their dictionary
ids as needed when serializing
+ conversionBuffer = persistedIdConversions.map(
+ dimensionName + "_idConversions_" + i,
+ dictionaryMergeIterator.conversions[i]
+ );
+ } else {
+ conversionBuffer = dictionaryMergeIterator.conversions[i];
+ }
+ dimConversions.set(i, conversionBuffer);
}
}
cardinality = dictionaryMergeIterator.getCardinality();
@@ -702,4 +748,117 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws
IOException;
void write() throws IOException;
}
+
+ protected static class IdConversionSerializer implements Serializer
+ {
+ private final IntBuffer buffer;
+ private final ByteBuffer scratch;
+
+ protected IdConversionSerializer(IntBuffer buffer)
+ {
+ this.buffer = buffer.asReadOnlyBuffer();
+ this.buffer.position(0);
+ this.scratch =
ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return (long) buffer.capacity() * Integer.BYTES;
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
+ {
+ // currently no support for id conversion buffers larger than 2gb
+ buffer.position(0);
+ while (buffer.remaining() > 0) {
+ scratch.position(0);
+ scratch.putInt(buffer.get());
+ scratch.flip();
+ channel.write(scratch);
+ }
+ }
+ }
+
+ /**
+ * Closer of {@link PersistedIdConversion} and a parent path which they are
stored in for easy cleanup when the
+ * segment is closed.
+ */
+ protected static class PersistedIdConversions implements Closeable
+ {
+ private final File tempDir;
+ private final Closer closer;
+
+ protected PersistedIdConversions(File tempDir)
+ {
+ this.tempDir = tempDir;
+ this.closer = Closer.create();
+ }
+
+ @Nullable
+ public IntBuffer map(String name, IntBuffer intBuffer) throws IOException
+ {
+ final File bufferDir = new File(tempDir, name);
+ FileUtils.mkdirp(bufferDir);
+ final IdConversionSerializer serializer = new
IdConversionSerializer(intBuffer);
+ return closer.register(new PersistedIdConversion(bufferDir,
serializer)).getBuffer();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try {
+ closer.close();
+ }
+ finally {
+ FileUtils.deleteDirectory(tempDir);
+ }
+ }
+ }
+
+ /**
+ * Peristent dictionary id conversion mappings, artifacts created during
segment merge which map old dictionary ids
+ * to new dictionary ids. These persistent mappings are only used when the
id mapping needs a lifetime longer than
+ * the merge of the column itself, such as when the column being merged is a
'parent' column of a projection.
+ *
+ * @see DimensionMergerV9#markAsParent()
+ * @see DimensionMergerV9#attachParent(DimensionMergerV9, List)
+ */
+ protected static class PersistedIdConversion implements Closeable
+ {
+ private final File idConversionFile;
+ private final SmooshedFileMapper bufferMapper;
+ private final IntBuffer buffer;
+
+ private boolean isClosed;
+
+ protected PersistedIdConversion(File idConversionDir, Serializer
idConversionSerializer) throws IOException
+ {
+ this.idConversionFile = idConversionDir;
+ this.bufferMapper = ColumnSerializerUtils.mapSerializer(idConversionDir,
idConversionSerializer, idConversionDir.getName());
+ final ByteBuffer mappedBuffer =
bufferMapper.mapFile(idConversionDir.getName());
+ mappedBuffer.order(ByteOrder.nativeOrder());
+ this.buffer = mappedBuffer.asIntBuffer();
+ }
+
+ @Nullable
+ public IntBuffer getBuffer()
+ {
+ if (isClosed) {
+ return null;
+ }
+ return buffer;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (!isClosed) {
+ isClosed = true;
+ bufferMapper.close();
+ FileUtils.deleteDirectory(idConversionFile);
+ }
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
index 4ffc7ee04c4..689dfde16cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
@@ -29,6 +30,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.util.Comparator;
/**
@@ -100,6 +102,28 @@ public interface DimensionHandler
*/
DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType>
makeIndexer(boolean useMaxMemoryEstimates);
+ /**
+ * @deprecated use {@link #makeMerger(String, IndexSpec,
SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)}
+ *
+ * This method exists for backwards compatiblity with older versions of
Druid since this is an unofficial extension
+ * point that must be implemented to create custom dimension types, and will
be removed in a future release.
+ */
+ @Deprecated
+ default DimensionMergerV9 makeMerger(
+ String outputName,
+ IndexSpec indexSpec,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ ColumnCapabilities capabilities,
+ ProgressIndicator progress,
+ Closer closer
+ )
+ {
+ throw DruidException.defensive(
+ "this method is no longer supported, use makeMerger(String, IndexSpec,
SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)
instead"
+ );
+ }
+
+
/**
* Creates a new DimensionMergerV9, a per-dimension object responsible for
merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with
IndexMergerV9 only.
@@ -113,16 +137,32 @@ public interface DimensionHandler
* needed
* @param capabilities The ColumnCapabilities of the dimension
represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process
+ * @param segmentBaseDir segment write out path; temporary files may
be created here, though should delete
+ * after merge is finished OR be registered
with the Closer parameter
+ * @param closer Closer tied to segment completion. Anything
which is not cleaned up inside of the
+ * merger after merge is complete should be
registered with this closer. For example,
+ * resources which are required for final
serialization of the column
* @return A new DimensionMergerV9 object.
*/
- DimensionMergerV9 makeMerger(
+ default DimensionMergerV9 makeMerger(
String outputName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
- );
+ )
+ {
+ return makeMerger(
+ outputName,
+ indexSpec,
+ segmentWriteOutMedium,
+ capabilities,
+ progress,
+ closer
+ );
+ }
/**
* Given an key component representing a single set of row value(s) for this
dimension as an Object,
diff --git
a/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
index 5e3f5cad886..26802745ca1 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
@@ -39,12 +39,27 @@ public interface DimensionMergerV9 extends DimensionMerger
*/
ColumnDescriptor makeColumnDescriptor();
+ /**
+ * Sets this merger as the "parent" of another merger for a "projection",
allowing for this merger to preserve any
+ * state which might be required for the projection mergers to do their
thing. This method MUST be called prior to
+ * performing any merge work. Typically, this method is only implemented if
+ * {@link #attachParent(DimensionMergerV9, List)} requires it.
+ */
+ default void markAsParent()
+ {
+ // do nothing
+ }
+
/**
* Attaches the {@link DimensionMergerV9} of a "projection" parent column so
that stuff like value dictionaries can
- * be shared between parent and child
+ * be shared between parent and child. This method is called during merging
instead of {@link #writeMergedValueDictionary(List)} if
+ * the parent column exists.
+ *
+ * @see IndexMergerV9#makeProjections
*/
default void attachParent(DimensionMergerV9 parent, List<IndexableAdapter>
projectionAdapters) throws IOException
{
- // do nothing
+ // by default fall through to writing merged dictionary
+ writeMergedValueDictionary(projectionAdapters);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
index 035384e522b..e8b9b9a2c9c 100644
---
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
@@ -30,6 +30,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableDoubleColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.io.File;
import java.util.Comparator;
public class DoubleDimensionHandler implements DimensionHandler<Double,
Double, Double>
@@ -82,6 +83,7 @@ public class DoubleDimensionHandler implements
DimensionHandler<Double, Double,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
index 925fefaa9be..0defd19c0a9 100644
---
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
@@ -30,6 +30,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableFloatColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.io.File;
import java.util.Comparator;
public class FloatDimensionHandler implements DimensionHandler<Float, Float,
Float>
@@ -82,6 +83,7 @@ public class FloatDimensionHandler implements
DimensionHandler<Float, Float, Flo
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 9cab1ce1928..8470c63f3e7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -719,6 +719,7 @@ public class IndexIO
if
(groupingColumn.equals(projectionSpec.getSchema().getTimeColumnName())) {
projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME,
projectionColumns.get(groupingColumn));
+ projectionColumns.remove(groupingColumn);
}
}
for (AggregatorFactory aggregator :
projectionSpec.getSchema().getAggregators()) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 04cc579208d..7da89edaff7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -227,12 +227,24 @@ public class IndexMergerV9 implements IndexMerger
segmentWriteOutMedium,
dimFormats.get(i).toColumnCapabilities(),
progress,
+ outDir,
closer
);
mergers.add(merger);
mergersMap.put(mergedDimensions.get(i), merger);
}
+ if (segmentMetadata != null && segmentMetadata.getProjections() != null)
{
+ for (AggregateProjectionMetadata projectionMetadata :
segmentMetadata.getProjections()) {
+ for (String dimension :
projectionMetadata.getSchema().getGroupingColumns()) {
+ DimensionMergerV9 merger = mergersMap.get(dimension);
+ if (merger != null) {
+ merger.markAsParent();
+ }
+ }
+ }
+ }
+
/************* Setup Dim Conversions **************/
progress.progress();
startTime = System.currentTimeMillis();
@@ -302,6 +314,7 @@ public class IndexMergerV9 implements IndexMerger
indexSpec,
segmentWriteOutMedium,
progress,
+ outDir,
closer,
mergersMap,
segmentMetadata
@@ -356,6 +369,7 @@ public class IndexMergerV9 implements IndexMerger
final IndexSpec indexSpec,
final SegmentWriteOutMedium segmentWriteOutMedium,
final ProgressIndicator progress,
+ final File segmentBaseDir,
final Closer closer,
final Map<String, DimensionMergerV9> parentMergers,
final Metadata segmentMetadata
@@ -389,6 +403,7 @@ public class IndexMergerV9 implements IndexMerger
segmentWriteOutMedium,
dimensionFormat.toColumnCapabilities(),
progress,
+ segmentBaseDir,
closer
);
if (parentMergers.containsKey(dimension)) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
index e93a71be883..928bd8e2b13 100644
---
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
@@ -30,6 +30,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.io.File;
import java.util.Comparator;
public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
@@ -82,6 +83,7 @@ public class LongDimensionHandler implements
DimensionHandler<Long, Long, Long>
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
index 348d40e4f2d..ab3f9053bf1 100644
---
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
@@ -31,6 +31,7 @@ import
org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelec
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.util.Comparator;
public class NestedCommonFormatColumnHandler implements
DimensionHandler<StructuredData, StructuredData, StructuredData>
@@ -82,10 +83,11 @@ public class NestedCommonFormatColumnHandler implements
DimensionHandler<Structu
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
- return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec,
segmentWriteOutMedium, closer);
+ return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec,
segmentWriteOutMedium, segmentBaseDir, closer);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
index d74933faaf1..effb74f8c32 100644
---
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
+++
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
@@ -30,6 +30,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.io.File;
import java.util.Comparator;
public class NestedDataColumnHandlerV4 implements
DimensionHandler<StructuredData, StructuredData, StructuredData>
@@ -78,6 +79,7 @@ public class NestedDataColumnHandlerV4 implements
DimensionHandler<StructuredDat
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
index 6b48d304b80..ed67d482622 100644
---
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
@@ -44,6 +44,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
+import java.util.stream.Collectors;
/**
*
@@ -239,21 +240,36 @@ public abstract class SimpleQueryableIndex implements
QueryableIndex
public QueryableIndex getProjectionQueryableIndex(String name)
{
final AggregateProjectionMetadata projectionSpec =
projectionsMap.get(name);
+ final Metadata projectionMetadata = new Metadata(
+ null,
+ projectionSpec.getSchema().getAggregators(),
+ null,
+ null,
+ true,
+ projectionSpec.getSchema().getOrderingWithTimeColumnSubstitution(),
+ null
+ );
return new SimpleQueryableIndex(
dataInterval,
- new ListIndexed<>(projectionSpec.getSchema().getGroupingColumns()),
+ new ListIndexed<>(
+ projectionSpec.getSchema()
+ .getGroupingColumns()
+ .stream()
+ .filter(x ->
!x.equals(projectionSpec.getSchema().getTimeColumnName()))
+ .collect(Collectors.toList())
+ ),
bitmapFactory,
projectionColumns.get(name),
fileMapper,
true,
- null,
+ projectionMetadata,
null
)
{
@Override
public Metadata getMetadata()
{
- return null;
+ return projectionMetadata;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index 3bb1e721727..37e62bad64c 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -32,6 +32,7 @@ import
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import java.io.File;
import java.util.Collections;
import java.util.Comparator;
@@ -169,6 +170,7 @@ public class StringDimensionHandler implements
DimensionHandler<Integer, int[],
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
@@ -188,6 +190,7 @@ public class StringDimensionHandler implements
DimensionHandler<Integer, int[],
segmentWriteOutMedium,
capabilities,
progress,
+ segmentBaseDir,
closer
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 8c71d418556..b0f9c6f466a 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -48,6 +48,7 @@ import
org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
@@ -84,10 +85,11 @@ public class StringDimensionMergerV9 extends
DictionaryEncodedColumnMerger<Strin
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
+ File segmentBaseDir,
Closer closer
)
{
- super(dimensionName, outputName, indexSpec, segmentWriteOutMedium,
capabilities, progress, closer);
+ super(dimensionName, outputName, indexSpec, segmentWriteOutMedium,
capabilities, progress, segmentBaseDir, closer);
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
index 4569df39ef6..c22dfde831f 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
@@ -19,15 +19,11 @@
package org.apache.druid.segment.nested;
-import com.google.common.primitives.Ints;
-import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.TypeStrategies;
import org.apache.druid.segment.data.DictionaryWriter;
@@ -35,6 +31,7 @@ import org.apache.druid.segment.data.FixedIndexed;
import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
@@ -42,13 +39,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.EnumSet;
/**
* Value to dictionary id lookup, backed with memory mapped dictionaries
populated lazily by the supplied
@@ -56,36 +46,38 @@ import java.util.EnumSet;
*/
public final class DictionaryIdLookup implements Closeable
{
+
private final String name;
- private final Path tempBasePath;
+ private final File tempBasePath;
@Nullable
private final DictionaryWriter<String> stringDictionaryWriter;
- private Path stringDictionaryFile = null;
+ private File stringDictionaryFile = null;
private SmooshedFileMapper stringBufferMapper = null;
private Indexed<ByteBuffer> stringDictionary = null;
@Nullable
private final DictionaryWriter<Long> longDictionaryWriter;
- private Path longDictionaryFile = null;
- private MappedByteBuffer longBuffer = null;
+ private File longDictionaryFile = null;
+ private SmooshedFileMapper longBufferMapper = null;
private FixedIndexed<Long> longDictionary = null;
@Nullable
private final DictionaryWriter<Double> doubleDictionaryWriter;
- private Path doubleDictionaryFile = null;
- MappedByteBuffer doubleBuffer = null;
+ private File doubleDictionaryFile = null;
+ SmooshedFileMapper doubleBufferMapper = null;
FixedIndexed<Double> doubleDictionary = null;
@Nullable
private final DictionaryWriter<int[]> arrayDictionaryWriter;
- private Path arrayDictionaryFile = null;
- private MappedByteBuffer arrayBuffer = null;
+ private File arrayDictionaryFile = null;
+ private SmooshedFileMapper arrayBufferMapper = null;
private FrontCodedIntArrayIndexed arrayDictionary = null;
+ private final Closer closer = Closer.create();
public DictionaryIdLookup(
String name,
- Path tempBasePath,
+ File tempBaseDir,
@Nullable DictionaryWriter<String> stringDictionaryWriter,
@Nullable DictionaryWriter<Long> longDictionaryWriter,
@Nullable DictionaryWriter<Double> doubleDictionaryWriter,
@@ -93,7 +85,7 @@ public final class DictionaryIdLookup implements Closeable
)
{
this.name = name;
- this.tempBasePath = tempBasePath;
+ this.tempBasePath = tempBaseDir;
this.stringDictionaryWriter = stringDictionaryWriter;
this.longDictionaryWriter = longDictionaryWriter;
this.doubleDictionaryWriter = doubleDictionaryWriter;
@@ -172,42 +164,27 @@ public final class DictionaryIdLookup implements Closeable
}
@Nullable
- public ByteBuffer getLongBuffer()
+ public SmooshedFileMapper getLongBufferMapper()
{
- return longBuffer;
+ return longBufferMapper;
}
@Nullable
- public ByteBuffer getDoubleBuffer()
+ public SmooshedFileMapper getDoubleBufferMapper()
{
- return doubleBuffer;
+ return doubleBufferMapper;
}
@Nullable
- public ByteBuffer getArrayBuffer()
+ public SmooshedFileMapper getArrayBufferMapper()
{
- return arrayBuffer;
+ return arrayBufferMapper;
}
@Override
public void close()
{
- if (stringBufferMapper != null) {
- stringBufferMapper.close();
- deleteTempFile(stringDictionaryFile);
- }
- if (longBuffer != null) {
- ByteBufferUtils.unmap(longBuffer);
- deleteTempFile(longDictionaryFile);
- }
- if (doubleBuffer != null) {
- ByteBufferUtils.unmap(doubleBuffer);
- deleteTempFile(doubleDictionaryFile);
- }
- if (arrayBuffer != null) {
- ByteBufferUtils.unmap(arrayBuffer);
- deleteTempFile(arrayDictionaryFile);
- }
+ CloseableUtils.closeAndWrapExceptions(closer);
}
private int longOffset()
@@ -228,28 +205,16 @@ public final class DictionaryIdLookup implements Closeable
private void ensureStringDictionaryLoaded()
{
if (stringDictionary == null) {
- // GenericIndexed v2 can write to multiple files if the dictionary is
larger than 2gb, so we use a smooshfile
- // for strings because of this. if other type dictionary writers could
potentially use multiple internal files
- // in the future, we should transition them to using this approach as
well (or build a combination smoosher and
- // mapper so that we can have a mutable smoosh)
- File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath,
StringUtils.urlEncode(name) + "__stringTempSmoosh");
- stringDictionaryFile = stringSmoosh.toPath();
final String fileName = ColumnSerializerUtils.getInternalFileName(
name,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
);
+ stringDictionaryFile = makeTempDir(fileName);
+ stringBufferMapper = closer.register(
+ ColumnSerializerUtils.mapSerializer(stringDictionaryFile,
stringDictionaryWriter, fileName)
+ );
- try (
- final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
- final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
- fileName,
- stringDictionaryWriter.getSerializedSize()
- )
- ) {
- stringDictionaryWriter.writeTo(writer, smoosher);
- writer.close();
- smoosher.close();
- stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
+ try {
final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
stringDictionary =
StringEncodingStrategies.getStringDictionarySupplier(
stringBufferMapper,
@@ -266,148 +231,79 @@ public final class DictionaryIdLookup implements
Closeable
private void ensureLongDictionaryLoaded()
{
if (longDictionary == null) {
- longDictionaryFile = makeTempFile(name +
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
- longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
- longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG,
ByteOrder.nativeOrder(), Long.BYTES).get();
- // reset position
- longBuffer.position(0);
+ final String fileName = ColumnSerializerUtils.getInternalFileName(
+ name,
+ ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
+ );
+ longDictionaryFile = makeTempDir(fileName);
+ longBufferMapper = closer.register(
+ ColumnSerializerUtils.mapSerializer(longDictionaryFile,
longDictionaryWriter, fileName)
+ );
+ try {
+ final ByteBuffer buffer = longBufferMapper.mapFile(fileName);
+ longDictionary = FixedIndexed.read(buffer, TypeStrategies.LONG,
ByteOrder.nativeOrder(), Long.BYTES).get();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
private void ensureDoubleDictionaryLoaded()
{
if (doubleDictionary == null) {
- doubleDictionaryFile = makeTempFile(name +
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
- doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
- doubleDictionary = FixedIndexed.read(
- doubleBuffer,
- TypeStrategies.DOUBLE,
- ByteOrder.nativeOrder(),
- Double.BYTES
- ).get();
- // reset position
- doubleBuffer.position(0);
+ final String fileName = ColumnSerializerUtils.getInternalFileName(
+ name,
+ ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
+ );
+ doubleDictionaryFile = makeTempDir(fileName);
+ doubleBufferMapper = closer.register(
+ ColumnSerializerUtils.mapSerializer(doubleDictionaryFile,
doubleDictionaryWriter, fileName)
+ );
+ try {
+ final ByteBuffer buffer = doubleBufferMapper.mapFile(fileName);
+ doubleDictionary = FixedIndexed.read(buffer, TypeStrategies.DOUBLE,
ByteOrder.nativeOrder(), Long.BYTES).get();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
private void ensureArrayDictionaryLoaded()
{
if (arrayDictionary == null && arrayDictionaryWriter != null) {
- arrayDictionaryFile = makeTempFile(name +
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
- arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
- arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer,
ByteOrder.nativeOrder()).get();
- // reset position
- arrayBuffer.position(0);
- }
- }
-
- private Path makeTempFile(String name)
- {
- try {
- return Files.createTempFile(tempBasePath, StringUtils.urlEncode(name),
null);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void deleteTempFile(Path path)
- {
- try {
- final File file = path.toFile();
- if (file.isDirectory()) {
- FileUtils.deleteDirectory(file);
- } else {
- Files.delete(path);
+ final String fileName = ColumnSerializerUtils.getInternalFileName(
+ name,
+ ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
+ );
+ arrayDictionaryFile = makeTempDir(fileName);
+ arrayBufferMapper = closer.register(
+ ColumnSerializerUtils.mapSerializer(arrayDictionaryFile,
arrayDictionaryWriter, fileName)
+ );
+ try {
+ final ByteBuffer buffer = arrayBufferMapper.mapFile(fileName);
+ arrayDictionary = FrontCodedIntArrayIndexed.read(buffer,
ByteOrder.nativeOrder()).get();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
}
- }
- catch (IOException e) {
- throw new RuntimeException(e);
}
}
- @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
- private MappedByteBuffer mapWriter(Path path, DictionaryWriter<?> writer)
+ private File makeTempDir(String fileName)
{
- final EnumSet<StandardOpenOption> options = EnumSet.of(
- StandardOpenOption.READ,
- StandardOpenOption.WRITE,
- StandardOpenOption.CREATE,
- StandardOpenOption.TRUNCATE_EXISTING
- );
-
- try (FileChannel fileChannel = FileChannel.open(path, options);
- GatheringByteChannel smooshChannel = makeWriter(fileChannel,
writer.getSerializedSize())) {
- //noinspection DataFlowIssue
- writer.writeTo(smooshChannel, null);
- return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0,
writer.getSerializedSize());
+ try {
+ final File f = new File(tempBasePath, StringUtils.urlEncode(fileName));
+ FileUtils.mkdirp(f);
+ closer.register(() -> FileUtils.deleteDirectory(f));
+ return f;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
- private GatheringByteChannel makeWriter(FileChannel channel, long size)
- {
- // basically same code as smooshed writer, can't use channel directly
because copying between channels
- // doesn't handle size of source channel correctly
- return new GatheringByteChannel()
- {
- private boolean isClosed = false;
- private long currOffset = 0;
-
- @Override
- public boolean isOpen()
- {
- return !isClosed;
- }
-
- @Override
- public void close() throws IOException
- {
- channel.close();
- isClosed = true;
- }
-
- public int bytesLeft()
- {
- return (int) (size - currOffset);
- }
-
- @Override
- public int write(ByteBuffer buffer) throws IOException
- {
- return addToOffset(channel.write(buffer));
- }
-
- @Override
- public long write(ByteBuffer[] srcs, int offset, int length) throws
IOException
- {
- return addToOffset(channel.write(srcs, offset, length));
- }
-
- @Override
- public long write(ByteBuffer[] srcs) throws IOException
- {
- return addToOffset(channel.write(srcs));
- }
-
- public int addToOffset(long numBytesWritten)
- {
- if (numBytesWritten > bytesLeft()) {
- throw DruidException.defensive(
- "Wrote more bytes[%,d] than available[%,d]. Don't do that.",
- numBytesWritten,
- bytesLeft()
- );
- }
- currOffset += numBytesWritten;
-
- return Ints.checkedCast(numBytesWritten);
- }
- };
- }
-
public int getStringCardinality()
{
ensureStringDictionaryLoaded();
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 50bca997735..32bf670b003 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -21,11 +21,13 @@ package org.apache.druid.segment.nested;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.data.VByte;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -53,7 +55,7 @@ public abstract class NestedCommonFormatColumnSerializer
implements GenericColum
public static final String RAW_FILE_NAME = "__raw";
public static final String NESTED_FIELD_PREFIX = "__field_";
- public abstract void openDictionaryWriter() throws IOException;
+ public abstract void openDictionaryWriter(File segmentBaseDir) throws
IOException;
public void serializeFields(SortedMap<String, FieldTypeInfo.MutableTypeSet>
fields) throws IOException
{
@@ -80,9 +82,11 @@ public abstract class NestedCommonFormatColumnSerializer
implements GenericColum
ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(),
fileName);
}
- protected void writeInternal(FileSmoosher smoosher, ByteBuffer buffer,
String fileName) throws IOException
+ protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper
fileMapper) throws IOException
{
- ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(),
fileName);
+ for (String internalName : fileMapper.getInternalFilenames()) {
+ smoosher.add(internalName, fileMapper.mapFile(internalName));
+ }
}
protected void writeV0Header(WritableByteChannel channel, ByteBuffer
columnNameBuffer) throws IOException
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6a3405e58fc..f7c2cbc1b59 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -24,12 +24,10 @@ import com.google.common.collect.Maps;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
@@ -51,6 +49,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -201,7 +200,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
}
@Override
- public void openDictionaryWriter() throws IOException
+ public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name,
GenericIndexed.STRING_STRATEGY);
fieldsWriter.open();
@@ -243,7 +242,7 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ segmentBaseDir,
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,
@@ -440,37 +439,22 @@ public class NestedDataColumnSerializer extends
NestedCommonFormatColumnSerializ
if (writeDictionary) {
if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
- SmooshedFileMapper fileMapper =
globalDictionaryIdLookup.getStringBufferMapper();
- for (String internalName : fileMapper.getInternalFilenames()) {
- smoosher.add(internalName, fileMapper.mapFile(internalName));
- }
+ copyFromTempSmoosh(smoosher,
globalDictionaryIdLookup.getStringBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
- if (globalDictionaryIdLookup.getLongBuffer() != null) {
- writeInternal(
- smoosher,
- globalDictionaryIdLookup.getLongBuffer(),
- ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
- );
+ if (globalDictionaryIdLookup.getLongBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
globalDictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, longDictionaryWriter,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
- if (globalDictionaryIdLookup.getDoubleBuffer() != null) {
- writeInternal(
- smoosher,
- globalDictionaryIdLookup.getDoubleBuffer(),
- ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
- );
+ if (globalDictionaryIdLookup.getDoubleBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
globalDictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, doubleDictionaryWriter,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
- if (globalDictionaryIdLookup.getArrayBuffer() != null) {
- writeInternal(
- smoosher,
- globalDictionaryIdLookup.getArrayBuffer(),
- ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
- );
+ if (globalDictionaryIdLookup.getArrayBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
globalDictionaryIdLookup.getArrayBufferMapper());
} else {
writeInternal(smoosher, arrayDictionaryWriter,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
index bb015ecce92..87240f3fadc 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
@@ -200,7 +200,7 @@ public class NestedDataColumnSerializerV4 implements
GenericColumnSerializer<Str
globalDictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ FileUtils.getTempDir().toFile(),
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index 050848f6503..52d798149fd 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -19,7 +19,6 @@
package org.apache.druid.segment.nested;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
@@ -66,7 +66,7 @@ public class ScalarDoubleColumnSerializer extends
ScalarNestedCommonFormatColumn
}
@Override
- public void openDictionaryWriter() throws IOException
+ public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
@@ -79,7 +79,7 @@ public class ScalarDoubleColumnSerializer extends
ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ segmentBaseDir,
null,
null,
dictionaryWriter,
@@ -136,8 +136,8 @@ public class ScalarDoubleColumnSerializer extends
ScalarNestedCommonFormatColumn
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
- if (dictionaryIdLookup.getDoubleBuffer() != null) {
- writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(),
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+ if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index 89408b20388..e62c1ebe8aa 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -19,7 +19,6 @@
package org.apache.druid.segment.nested;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
@@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
@@ -67,7 +67,7 @@ public class ScalarLongColumnSerializer extends
ScalarNestedCommonFormatColumnSe
}
@Override
- public void openDictionaryWriter() throws IOException
+ public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = new FixedIndexedWriter<>(
segmentWriteOutMedium,
@@ -80,7 +80,7 @@ public class ScalarLongColumnSerializer extends
ScalarNestedCommonFormatColumnSe
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ segmentBaseDir,
null,
dictionaryWriter,
null,
@@ -136,8 +136,8 @@ public class ScalarLongColumnSerializer extends
ScalarNestedCommonFormatColumnSe
@Override
protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
{
- if (dictionaryIdLookup.getLongBuffer() != null) {
- writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(),
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+ if (dictionaryIdLookup.getLongBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
index 230a9433cdc..5a335a312b4 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
@@ -20,7 +20,6 @@
package org.apache.druid.segment.nested;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@@ -34,6 +33,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
/**
@@ -62,7 +62,7 @@ public class ScalarStringColumnSerializer extends
ScalarNestedCommonFormatColumn
}
@Override
- public void openDictionaryWriter() throws IOException
+ public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
indexSpec.getStringDictionaryEncoding(),
@@ -73,7 +73,7 @@ public class ScalarStringColumnSerializer extends
ScalarNestedCommonFormatColumn
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ segmentBaseDir,
dictionaryWriter,
null,
null,
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 0d07822db54..9adb8fac2b7 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -26,13 +26,11 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
@@ -52,6 +50,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -133,7 +132,7 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
}
@Override
- public void openDictionaryWriter() throws IOException
+ public void openDictionaryWriter(File segmentBaseDir) throws IOException
{
dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
indexSpec.getStringDictionaryEncoding(),
@@ -171,7 +170,7 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
dictionaryIdLookup = closer.register(
new DictionaryIdLookup(
name,
- FileUtils.getTempDir(),
+ segmentBaseDir,
dictionaryWriter,
longDictionaryWriter,
doubleDictionaryWriter,
@@ -423,29 +422,22 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
if (writeDictionary) {
if (dictionaryIdLookup.getStringBufferMapper() != null) {
- SmooshedFileMapper fileMapper =
dictionaryIdLookup.getStringBufferMapper();
- for (String internalName : fileMapper.getInternalFilenames()) {
- smoosher.add(internalName, fileMapper.mapFile(internalName));
- }
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getStringBufferMapper());
} else {
writeInternal(smoosher, dictionaryWriter,
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
}
- if (dictionaryIdLookup.getLongBuffer() != null) {
- writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(),
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+ if (dictionaryIdLookup.getLongBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
} else {
writeInternal(smoosher, longDictionaryWriter,
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
}
- if (dictionaryIdLookup.getDoubleBuffer() != null) {
- writeInternal(
- smoosher,
- dictionaryIdLookup.getDoubleBuffer(),
- ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
- );
+ if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getDoubleBufferMapper());
} else {
writeInternal(smoosher, doubleDictionaryWriter,
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
}
- if (dictionaryIdLookup.getArrayBuffer() != null) {
- writeInternal(smoosher, dictionaryIdLookup.getArrayBuffer(),
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+ if (dictionaryIdLookup.getArrayBufferMapper() != null) {
+ copyFromTempSmoosh(smoosher,
dictionaryIdLookup.getArrayBufferMapper());
} else {
writeInternal(smoosher, arrayDictionaryWriter,
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
index 8396f07fd35..542441bb00b 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
@@ -25,10 +25,11 @@ import
com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
+import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
public class ColumnSerializerUtils
{
@@ -65,17 +66,31 @@ public class ColumnSerializerUtils
}
}
- public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer,
String columnName, String fileName)
- throws IOException
+ public static String getInternalFileName(String fileNameBase, String field)
{
- final String internalName = getInternalFileName(columnName, fileName);
- try (SmooshedWriter smooshChannel =
smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
- smooshChannel.write(buffer);
- }
+ return fileNameBase + "." + field;
}
- public static String getInternalFileName(String fileNameBase, String field)
+ /**
+ * Writes a {@link Serializer} to a 'smoosh file' which contains the
contents of this single serializer, with the
+ * serializer writing to an internal file specified by the name argument,
returning a {@link SmooshedFileMapper}
+ */
+ public static SmooshedFileMapper mapSerializer(File smooshFile, Serializer
writer, String name)
{
- return fileNameBase + "." + field;
+ try (
+ final FileSmoosher smoosher = new FileSmoosher(smooshFile);
+ final SmooshedWriter smooshedWriter = smoosher.addWithSmooshedWriter(
+ name,
+ writer.getSerializedSize()
+ )
+ ) {
+ writer.writeTo(smooshedWriter, smoosher);
+ smooshedWriter.close();
+ smoosher.close();
+ return SmooshedFileMapper.load(smooshFile);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
b/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
index 607d17195cd..b6889b148c2 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
@@ -36,8 +36,11 @@ public interface Serializer
long getSerializedSize() throws IOException;
/**
- * Writes serialized form of this object to the given channel. If parallel
data streams are needed, they could be
- * created with the provided smoosher.
+ * Writes the serialized form of this object. The entire object may be
written to the provided channel, or the object
+ * may be split over the provided channel and files added to the {@link
FileSmoosher], where additional channels can
+ * be created via {@link FileSmoosher#addWithSmooshedWriter(String, long)}.
The latter approach is useful when the
+ * serialized form of the object is too large for a single smoosh container.
At the time this javadoc was written,
+ * the max smoosh container size is limit to the max {@link
java.nio.ByteBuffer} size.
*/
void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws
IOException;
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 7faa31d8e53..a6893756496 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -39,17 +41,23 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.BitmapValues;
@@ -63,9 +71,11 @@ import
org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@@ -143,6 +153,7 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
private final IndexSpec indexSpec;
private final IndexIO indexIO;
private final boolean useBitmapIndexes;
+ private final BitmapSerdeFactory serdeFactory;
@Rule
public final CloserRule closer = new CloserRule(false);
@@ -163,6 +174,7 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
.withLongEncoding(longEncodingStrategy)
.build();
this.indexIO = TestHelper.getTestIndexIO();
+ this.serdeFactory = bitmapSerdeFactory;
this.useBitmapIndexes = bitmapSerdeFactory != null;
}
@@ -3031,6 +3043,229 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
validateTestMaxColumnsToMergeOutputSegment(merged7);
}
+ @Test
+ public void testMergeProjections() throws IOException
+ {
+ File tmp = FileUtils.createTempDir();
+ closer.closeLater(tmp::delete);
+
+ final DateTime timestamp =
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
+
+ final RowSignature rowSignature = RowSignature.builder()
+ .add("a", ColumnType.STRING)
+ .add("b", ColumnType.STRING)
+ .add("c", ColumnType.LONG)
+ .build();
+
+ final List<InputRow> rows1 = Arrays.asList(
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp,
+ rowSignature.getColumnNames(),
+ Arrays.asList("a", "x", 1L)
+ ),
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp.plusMinutes(1),
+ rowSignature.getColumnNames(),
+ Arrays.asList("b", "y", 2L)
+ ),
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp.plusHours(2),
+ rowSignature.getColumnNames(),
+ Arrays.asList("a", "z", 3L)
+ )
+ );
+
+ final List<InputRow> rows2 = Arrays.asList(
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp,
+ rowSignature.getColumnNames(),
+ Arrays.asList("b", "y", 1L)
+ ),
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp.plusMinutes(1),
+ rowSignature.getColumnNames(),
+ Arrays.asList("d", "w", 2L)
+ ),
+ new ListBasedInputRow(
+ rowSignature,
+ timestamp.plusHours(2),
+ rowSignature.getColumnNames(),
+ Arrays.asList("b", "z", 3L)
+ )
+ );
+
+ final DimensionsSpec.Builder dimensionsBuilder =
+ DimensionsSpec.builder()
+ .setDimensions(
+ Arrays.asList(
+ new StringDimensionSchema("a"),
+ new StringDimensionSchema("b")
+ )
+ );
+
+ List<AggregateProjectionSpec> projections = Arrays.asList(
+ new AggregateProjectionSpec(
+ "a_hourly_c_sum",
+ VirtualColumns.create(
+ Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+ ),
+ Arrays.asList(
+ new StringDimensionSchema("a"),
+ new LongDimensionSchema("__gran")
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("c_sum", "c")
+ }
+ ),
+ new AggregateProjectionSpec(
+ "a_c_sum",
+ VirtualColumns.EMPTY,
+ Collections.singletonList(
+ new StringDimensionSchema("a")
+ ),
+ new AggregatorFactory[]{
+ new LongSumAggregatorFactory("c_sum", "c")
+ }
+ )
+ );
+
+ IndexBuilder bob = IndexBuilder.create()
+ .tmpDir(tmp)
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dimensionsBuilder.build())
+ .withRollup(false)
+
.withProjections(projections)
+ .build()
+ )
+ .rows(rows1);
+
+ IndexBuilder bob2 = IndexBuilder.create()
+ .tmpDir(tmp)
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(dimensionsBuilder.build())
+
.withRollup(false)
+
.withProjections(projections)
+ .build()
+ )
+ .rows(rows2);
+
+ QueryableIndex q1 = bob.buildMMappedIndex();
+ QueryableIndex q2 = bob2.buildMMappedIndex();
+
+ QueryableIndex merged = indexIO.loadIndex(
+ indexMerger.merge(
+ Arrays.asList(
+ new QueryableIndexIndexableAdapter(q1),
+ new QueryableIndexIndexableAdapter(q2)
+ ),
+ true,
+ new AggregatorFactory[0],
+ temporaryFolder.newFolder(),
+ dimensionsBuilder.build(),
+ IndexSpec.DEFAULT,
+ -1
+ )
+ );
+
+ CursorBuildSpec p1Spec = CursorBuildSpec.builder()
+ .setQueryContext(
+ QueryContext.of(
+
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_hourly_c_sum")
+ )
+ )
+ .setVirtualColumns(
+ VirtualColumns.create(
+
Granularities.toVirtualColumn(Granularities.HOUR, "gran")
+ )
+ )
+ .setAggregators(
+ Collections.singletonList(
+ new
LongSumAggregatorFactory("c", "c")
+ )
+ )
+
.setGroupingColumns(Collections.singletonList("a"))
+ .build();
+ CursorBuildSpec p2Spec = CursorBuildSpec.builder()
+ .setQueryContext(
+ QueryContext.of(
+
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_c_sum")
+ )
+ )
+ .setAggregators(
+ Collections.singletonList(
+ new
LongSumAggregatorFactory("c", "c")
+ )
+ )
+
.setGroupingColumns(Collections.singletonList("a"))
+ .build();
+
+
+ QueryableIndexCursorFactory cursorFactory = new
QueryableIndexCursorFactory(merged);
+
+ try (final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(p1Spec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(5, rowCount);
+ }
+
+ try (final CursorHolder cursorHolder =
cursorFactory.makeCursorHolder(p2Spec)) {
+ final Cursor cursor = cursorHolder.asCursor();
+ int rowCount = 0;
+ while (!cursor.isDone()) {
+ rowCount++;
+ cursor.advance();
+ }
+ Assert.assertEquals(3, rowCount);
+ }
+
+ QueryableIndex p1Index =
merged.getProjectionQueryableIndex("a_hourly_c_sum");
+ Assert.assertNotNull(p1Index);
+ ColumnHolder aHolder = p1Index.getColumnHolder("a");
+ DictionaryEncodedColumn aCol = (DictionaryEncodedColumn)
aHolder.getColumn();
+ Assert.assertEquals(3, aCol.getCardinality());
+
+ QueryableIndex p2Index = merged.getProjectionQueryableIndex("a_c_sum");
+ Assert.assertNotNull(p2Index);
+ ColumnHolder aHolder2 = p2Index.getColumnHolder("a");
+ DictionaryEncodedColumn aCol2 = (DictionaryEncodedColumn)
aHolder2.getColumn();
+ Assert.assertEquals(3, aCol2.getCardinality());
+
+ if (serdeFactory != null) {
+
+ BitmapResultFactory resultFactory = new
DefaultBitmapResultFactory(serdeFactory.getBitmapFactory());
+
+ Assert.assertEquals(
+ 2,
+ resultFactory.toImmutableBitmap(
+ aHolder.getIndexSupplier()
+ .as(ValueIndexes.class)
+ .forValue("a", ColumnType.STRING)
+ .computeBitmapResult(resultFactory, false)
+ ).size()
+ );
+
+ Assert.assertEquals(
+ 1,
+ resultFactory.toImmutableBitmap(
+ aHolder2.getIndexSupplier()
+ .as(ValueIndexes.class)
+ .forValue("a", ColumnType.STRING)
+ .computeBitmapResult(resultFactory, false)
+ ).size()
+ );
+ }
+ }
private QueryableIndex persistAndLoad(List<DimensionSchema> schema,
InputRow... rows) throws IOException
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
index bba9a639803..d12ffc07f74 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
-import java.nio.file.Path;
public class DictionaryIdLookupTest extends InitializedNullHandlingTest
{
@@ -92,12 +91,12 @@ public class DictionaryIdLookupTest extends
InitializedNullHandlingTest
4
);
- Path dictTempPath = temp.newFolder().toPath();
+ File dictTempDir = temp.newFolder();
// make lookup with references to writers
DictionaryIdLookup idLookup = new DictionaryIdLookup(
"test",
- dictTempPath,
+ dictTempDir,
stringWriter,
longWriter,
doubleWriter,
@@ -110,7 +109,7 @@ public class DictionaryIdLookupTest extends
InitializedNullHandlingTest
doubleWriter.open();
arrayWriter.open();
- File tempDir = dictTempPath.toFile();
+ File tempDir = dictTempDir;
Assert.assertEquals(0, tempDir.listFiles().length);
for (String s : sortedValueDictionary.getSortedStrings()) {
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
index 64ba2679f04..87e457ef54e 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -197,7 +197,7 @@ public class NestedDataColumnSupplierTest extends
InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector =
mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeFields(sortedFields);
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index f483e297be0..ccc425b1700 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarDoubleColumnSupplierTest extends
InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector =
mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index c8830f3aefd..2d40b2dd58f 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarLongColumnSupplierTest extends
InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector =
mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index d72970b3b12..110e5d68b83 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarStringColumnSupplierTest extends
InitializedNullHandlingTest
SortedValueDictionary globalDictionarySortedCollector =
mergable.getValueDictionary();
mergable.mergeFieldsInto(sortedFields);
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 6aea2ace234..6ab748d6111 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -275,7 +275,7 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
closer
);
- serializer.openDictionaryWriter();
+ serializer.openDictionaryWriter(tempFolder.newFolder());
serializer.serializeDictionaries(
globalDictionarySortedCollector.getSortedStrings(),
globalDictionarySortedCollector.getSortedLongs(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]