This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 02b2057 extract generic dictionary encoded column indexing and
merging stuffs (#11829)
02b2057 is described below
commit 02b205737124f42f5cdebe7beec31dc66cfc9a8a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Oct 22 17:31:22 2021 -0700
extract generic dictionary encoded column indexing and merging stuffs
(#11829)
* extract generic dictionary encoded column indexing and merging stuffs to
pave the path towards supporting other types of dictionary encoded columns
* spotbugs and inspections fixes
* friendlier
* javadoc
* better name
* adjust
---
.../segment/DictionaryEncodedColumnIndexer.java | 261 +++++++++
...rV9.java => DictionaryEncodedColumnMerger.java} | 376 ++++++-------
.../druid/segment/DictionaryMergingIterator.java | 177 ++++++
.../apache/druid/segment/DimensionDictionary.java | 163 ++++++
.../java/org/apache/druid/segment/IndexMerger.java | 207 -------
.../druid/segment/SortedDimensionDictionary.java | 73 +++
.../druid/segment/StringDimensionIndexer.java | 404 +------------
.../druid/segment/StringDimensionMergerV9.java | 626 ++-------------------
.../apache/druid/segment/data/GenericIndexed.java | 2 +-
.../druid/segment/data/GenericIndexedWriter.java | 4 +-
.../segment/serde/ComplexColumnPartSerde.java | 5 +-
...est.java => DictionaryMergingIteratorTest.java} | 9 +-
.../apache/druid/segment/IndexMergerTestBase.java | 28 -
13 files changed, 919 insertions(+), 1416 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
new file mode 100644
index 0000000..17202f5
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.data.CloseableIndexed;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.IndexedIterable;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+
+/**
+ * Basic structure for indexing dictionary encoded columns
+ */
+public abstract class DictionaryEncodedColumnIndexer<KeyType, ActualType
extends Comparable<ActualType>>
+ implements DimensionIndexer<Integer, KeyType, ActualType>
+{
+ protected final DimensionDictionary<ActualType> dimLookup;
+ protected volatile boolean isSparse = false;
+
+ @Nullable
+ protected SortedDimensionDictionary<ActualType> sortedLookup;
+
+ public DictionaryEncodedColumnIndexer()
+ {
+ this.dimLookup = new DimensionDictionary();
+ }
+
+ @Override
+ public void setSparseIndexed()
+ {
+ isSparse = true;
+ }
+
+ public int getSortedEncodedValueFromUnsorted(Integer
unsortedIntermediateValue)
+ {
+ return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);
+ }
+
+ @Override
+ public Integer getUnsortedEncodedValueFromSorted(Integer
sortedIntermediateValue)
+ {
+ return sortedLookup().getUnsortedIdFromSortedId(sortedIntermediateValue);
+ }
+
+ @Override
+ public CloseableIndexed<ActualType> getSortedIndexedValues()
+ {
+ return new CloseableIndexed<ActualType>()
+ {
+ @Override
+ public int size()
+ {
+ return getCardinality();
+ }
+
+ @Override
+ public ActualType get(int index)
+ {
+ return getActualValue(index, true);
+ }
+
+ @Override
+ public int indexOf(ActualType value)
+ {
+ int id = getEncodedValue(value, false);
+ return id < 0 ? DimensionDictionary.ABSENT_VALUE_ID :
getSortedEncodedValueFromUnsorted(id);
+ }
+
+ @Override
+ public Iterator<ActualType> iterator()
+ {
+ return IndexedIterable.create(this).iterator();
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ // nothing to inspect
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to close
+ }
+ };
+ }
+
+ @Override
+ public ActualType getMinValue()
+ {
+ return dimLookup.getMinValue();
+ }
+
+ @Override
+ public ActualType getMaxValue()
+ {
+ return dimLookup.getMaxValue();
+ }
+
+ @Override
+ public int getCardinality()
+ {
+ return dimLookup.size();
+ }
+
+ @Override
+ public ColumnValueSelector<?> makeColumnValueSelector(
+ IncrementalIndexRowHolder currEntry,
+ IncrementalIndex.DimensionDesc desc
+ )
+ {
+ return makeDimensionSelector(DefaultDimensionSpec.of(desc.getName()),
currEntry, desc);
+ }
+
+ @Override
+ public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector
selectorWithUnsortedValues)
+ {
+ DimensionSelector dimSelectorWithUnsortedValues = (DimensionSelector)
selectorWithUnsortedValues;
+ class SortedDimensionSelector implements DimensionSelector, IndexedInts
+ {
+ @Override
+ public int size()
+ {
+ return dimSelectorWithUnsortedValues.getRow().size();
+ }
+
+ @Override
+ public int get(int index)
+ {
+ return
sortedLookup().getSortedIdFromUnsortedId(dimSelectorWithUnsortedValues.getRow().get(index));
+ }
+
+ @Override
+ public IndexedInts getRow()
+ {
+ return this;
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(@Nullable String value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return dimSelectorWithUnsortedValues.getValueCardinality();
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("dimSelectorWithUnsortedValues",
dimSelectorWithUnsortedValues);
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ return dimSelectorWithUnsortedValues.getObject();
+ }
+
+ @Override
+ public Class classOfObject()
+ {
+ return dimSelectorWithUnsortedValues.classOfObject();
+ }
+ }
+ return new SortedDimensionSelector();
+ }
+
+ /**
+ * returns true if all values are encoded in {@link #dimLookup}
+ */
+ protected boolean dictionaryEncodesAllValues()
+ {
+ // name lookup is possible in advance if we explicitly process a value for
every row, or if we've encountered an
+ // actual null value and it is present in our dictionary. otherwise the
dictionary will be missing ids for implicit
+ // null values
+ return !isSparse || dimLookup.getIdForNull() !=
DimensionDictionary.ABSENT_VALUE_ID;
+ }
+
+ protected SortedDimensionDictionary<ActualType> sortedLookup()
+ {
+ return sortedLookup == null ? sortedLookup = dimLookup.sort() :
sortedLookup;
+ }
+
+ @Nullable
+ protected ActualType getActualValue(int intermediateValue, boolean idSorted)
+ {
+ if (idSorted) {
+ return sortedLookup().getValueFromSortedId(intermediateValue);
+ } else {
+ return dimLookup.getValue(intermediateValue);
+
+ }
+ }
+
+ protected int getEncodedValue(@Nullable ActualType fullValue, boolean
idSorted)
+ {
+ int unsortedId = dimLookup.getId(fullValue);
+
+ if (idSorted) {
+ return sortedLookup().getSortedIdFromUnsortedId(unsortedId);
+ } else {
+ return unsortedId;
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
similarity index 79%
copy from
processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
copy to
processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index abb4637..b78bfd9 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -20,45 +20,36 @@
package org.apache.druid.segment;
import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
import it.unimi.dsi.fastutil.ints.IntIterable;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
-import org.apache.druid.collections.spatial.ImmutableRTree;
-import org.apache.druid.collections.spatial.RTree;
-import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy;
-import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnDescriptor;
-import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.BitmapValues;
-import org.apache.druid.segment.data.ByteBufferWriter;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.ColumnarIntsSerializer;
import org.apache.druid.segment.data.ColumnarMultiIntsSerializer;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.IndexedInts;
-import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
-import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nonnull;
@@ -67,48 +58,46 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.IntBuffer;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
-public class StringDimensionMergerV9 implements DimensionMergerV9
+/**
+ * Base structure for merging dictionary encoded columns
+ */
+public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>>
implements DimensionMergerV9
{
- private static final Logger log = new Logger(StringDimensionMergerV9.class);
-
- private static final Indexed<String> NULL_STR_DIM_VAL = new
ListIndexed<>(Collections.singletonList(null));
- private static final Splitter SPLITTER = Splitter.on(",");
+ private static final Logger log = new
Logger(DictionaryEncodedColumnMerger.class);
- private final String dimensionName;
- private final ProgressIndicator progress;
- private final Closer closer;
- private final IndexSpec indexSpec;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final MutableBitmap nullRowsBitmap;
- private final ColumnCapabilities capabilities;
+ protected final String dimensionName;
+ protected final ProgressIndicator progress;
+ protected final Closer closer;
+ protected final IndexSpec indexSpec;
+ protected final SegmentWriteOutMedium segmentWriteOutMedium;
+ protected final MutableBitmap nullRowsBitmap;
+ protected final ColumnCapabilities capabilities;
- private int dictionarySize;
- private int rowCount = 0;
- private int cardinality = 0;
- private boolean hasNull = false;
+ protected int dictionarySize;
+ protected int rowCount = 0;
+ protected int cardinality = 0;
+ protected boolean hasNull = false;
@Nullable
- private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
+ protected GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
@Nullable
- private ByteBufferWriter<ImmutableRTree> spatialWriter;
+ protected ArrayList<IntBuffer> dimConversions;
@Nullable
- private ArrayList<IntBuffer> dimConversions;
+ protected List<IndexableAdapter> adapters;
@Nullable
- private List<IndexableAdapter> adapters;
+ protected DictionaryMergingIterator<T> dictionaryMergeIterator;
@Nullable
- private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
+ protected ColumnarIntsSerializer encodedValueSerializer;
@Nullable
- private ColumnarIntsSerializer encodedValueSerializer;
+ protected GenericIndexedWriter<T> dictionaryWriter;
@Nullable
- private GenericIndexedWriter<String> dictionaryWriter;
- @Nullable
- private String firstDictionaryValue;
+ protected T firstDictionaryValue;
- public StringDimensionMergerV9(
+ public DictionaryEncodedColumnMerger(
String dimensionName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
@@ -121,12 +110,18 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
this.indexSpec = indexSpec;
this.capabilities = capabilities;
this.segmentWriteOutMedium = segmentWriteOutMedium;
- nullRowsBitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ this.nullRowsBitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
this.progress = progress;
this.closer = closer;
}
+ protected abstract Comparator<Pair<Integer, PeekingIterator<T>>>
getDictionaryMergingComparator();
+ protected abstract Indexed<T> getNullDimValue();
+ protected abstract ObjectStrategy<T> getObjectStrategy();
+ @Nullable
+ protected abstract T coerceValue(T value);
+
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters)
throws IOException
{
@@ -143,11 +138,11 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
}
int numMergeIndex = 0;
- Indexed<String> dimValueLookup = null;
- Indexed<String>[] dimValueLookups = new Indexed[adapters.size() + 1];
+ Indexed<T> dimValueLookup = null;
+ Indexed<T>[] dimValueLookups = new Indexed[adapters.size() + 1];
for (int i = 0; i < adapters.size(); i++) {
@SuppressWarnings("MustBeClosedChecker") // we register dimValues in the
closer
- Indexed<String> dimValues =
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
+ Indexed<T> dimValues =
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
if (dimValues != null && !allNull(dimValues)) {
dimHasValues = true;
hasNull |= dimValues.indexOf(null) >= 0;
@@ -169,19 +164,23 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
*/
if (convertMissingValues && !hasNull) {
hasNull = true;
- dimValueLookups[adapters.size()] = dimValueLookup = NULL_STR_DIM_VAL;
+ dimValueLookups[adapters.size()] = dimValueLookup = getNullDimValue();
numMergeIndex++;
}
String dictFilename = StringUtils.format("%s.dim_values", dimensionName);
- dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium,
dictFilename, GenericIndexed.STRING_STRATEGY);
+ dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium,
dictFilename, getObjectStrategy());
firstDictionaryValue = null;
dictionarySize = 0;
dictionaryWriter.open();
cardinality = 0;
if (numMergeIndex > 1) {
- dictionaryMergeIterator = new
IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
+ dictionaryMergeIterator = new DictionaryMergingIterator<>(
+ dimValueLookups,
+ getDictionaryMergingComparator(),
+ true
+ );
writeDictionary(() -> dictionaryMergeIterator);
for (int i = 0; i < adapters.size(); i++) {
if (dimValueLookups[i] != null &&
dictionaryMergeIterator.needConversion(i)) {
@@ -204,51 +203,6 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
setupEncodedValueWriter();
}
- private void writeDictionary(Iterable<String> dictionaryValues) throws
IOException
- {
- for (String value : dictionaryValues) {
- dictionaryWriter.write(value);
- value = NullHandling.emptyToNullIfNeeded(value);
- if (dictionarySize == 0) {
- firstDictionaryValue = value;
- }
- dictionarySize++;
- }
- }
-
- protected void setupEncodedValueWriter() throws IOException
- {
- final CompressionStrategy compressionStrategy =
indexSpec.getDimensionCompression();
-
- String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
- if (capabilities.hasMultipleValues().isTrue()) {
- if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
- encodedValueSerializer =
V3CompressedVSizeColumnarMultiIntsSerializer.create(
- dimensionName,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionStrategy
- );
- } else {
- encodedValueSerializer =
- new VSizeColumnarMultiIntsSerializer(dimensionName,
segmentWriteOutMedium, cardinality);
- }
- } else {
- if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- dimensionName,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionStrategy
- );
- } else {
- encodedValueSerializer = new
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
- }
- }
- encodedValueSerializer.open();
- }
@Override
public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
@@ -371,30 +325,6 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
rowCount++;
}
- private static IndexedInts getRow(ColumnValueSelector s)
- {
- if (s instanceof DimensionSelector) {
- return ((DimensionSelector) s).getRow();
- } else if (s instanceof NilColumnValueSelector) {
- return IndexedInts.empty();
- } else {
- throw new ISE(
- "ColumnValueSelector[%s], only DimensionSelector or
NilColumnValueSelector is supported",
- s.getClass()
- );
- }
- }
-
- private static boolean isNullRow(IndexedInts row, int size)
- {
- for (int i = 0; i < size; i++) {
- if (row.get(i) != 0) {
- return false;
- }
- }
- return true;
- }
-
@Override
public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions)
throws IOException
{
@@ -416,15 +346,10 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
- RTree tree = null;
- boolean hasSpatial = capabilities.hasSpatialIndexes();
- if (hasSpatial) {
- spatialWriter = new ByteBufferWriter<>(
- segmentWriteOutMedium,
- new ImmutableRTreeObjectStrategy(bitmapFactory)
- );
- spatialWriter.open();
- tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory),
bitmapFactory);
+ ExtendedIndexesMerger extendedIndexesMerger = getExtendedIndexesMerger();
+
+ if (extendedIndexesMerger != null) {
+ extendedIndexesMerger.initialize();
}
IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions,
dimensionName);
@@ -432,18 +357,19 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
//Iterate all dim values's dictionary id in ascending order which in line
with dim values's compare result.
for (int dictId = 0; dictId < dictionarySize; dictId++) {
progress.progress();
- mergeBitmaps(
+ MutableBitmap mergedIndexes = mergeBitmaps(
segmentRowNumConversions,
bitmapFactory,
- tree,
- hasSpatial,
dictIdSeeker,
dictId
);
+ if (extendedIndexesMerger != null) {
+ extendedIndexesMerger.mergeIndexes(dictId, mergedIndexes);
+ }
}
- if (hasSpatial) {
- spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
+ if (extendedIndexesMerger != null) {
+ extendedIndexesMerger.write();
}
log.debug(
@@ -458,11 +384,63 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
}
}
- void mergeBitmaps(
+
+
+ @Nullable
+ protected ExtendedIndexesMerger getExtendedIndexesMerger()
+ {
+ return null;
+ }
+
+ protected void setupEncodedValueWriter() throws IOException
+ {
+ final CompressionStrategy compressionStrategy =
indexSpec.getDimensionCompression();
+
+ String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
+ if (capabilities.hasMultipleValues().isTrue()) {
+ if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
+ encodedValueSerializer =
V3CompressedVSizeColumnarMultiIntsSerializer.create(
+ dimensionName,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionStrategy
+ );
+ } else {
+ encodedValueSerializer =
+ new VSizeColumnarMultiIntsSerializer(dimensionName,
segmentWriteOutMedium, cardinality);
+ }
+ } else {
+ if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
+ encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+ dimensionName,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionStrategy
+ );
+ } else {
+ encodedValueSerializer = new
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
+ }
+ }
+ encodedValueSerializer.open();
+ }
+
+ protected void writeDictionary(Iterable<T> dictionaryValues) throws
IOException
+ {
+ for (T value : dictionaryValues) {
+ dictionaryWriter.write(value);
+ value = coerceValue(value);
+ if (dictionarySize == 0) {
+ firstDictionaryValue = value;
+ }
+ dictionarySize++;
+ }
+ }
+
+ protected MutableBitmap mergeBitmaps(
@Nullable List<IntBuffer> segmentRowNumConversions,
BitmapFactory bmpFactory,
- RTree tree,
- boolean hasSpatial,
IndexSeeker[] dictIdSeeker,
int dictId
) throws IOException
@@ -510,17 +488,7 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
bitmapWriter.write(bmpFactory.makeImmutableBitmap(mergedIndexes));
- if (hasSpatial) {
- String dimVal = dictionaryWriter.get(dictId);
- if (dimVal != null) {
- List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
- float[] coords = new float[stringCoords.size()];
- for (int j = 0; j < coords.length; j++) {
- coords[j] = Float.valueOf(stringCoords.get(j));
- }
- tree.insert(coords, mergedIndexes);
- }
- }
+ return mergedIndexes;
}
@Override
@@ -529,33 +497,62 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
return cardinality == 0;
}
- @Override
- public ColumnDescriptor makeColumnDescriptor()
+
+ protected IndexSeeker[] toIndexSeekers(
+ List<IndexableAdapter> adapters,
+ ArrayList<IntBuffer> dimConversions,
+ String dimension
+ )
{
- // Now write everything
- boolean hasMultiValue = capabilities.hasMultipleValues().isTrue();
- final CompressionStrategy compressionStrategy =
indexSpec.getDimensionCompression();
- final BitmapSerdeFactory bitmapSerdeFactory =
indexSpec.getBitmapSerdeFactory();
+ IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
+ for (int i = 0; i < adapters.size(); i++) {
+ IntBuffer dimConversion = dimConversions.get(i);
+ if (dimConversion != null) {
+ seekers[i] = new IndexSeekerWithConversion((IntBuffer)
dimConversion.asReadOnlyBuffer().rewind());
+ } else {
+ try (CloseableIndexed<String> dimValueLookup =
adapters.get(i).getDimValueLookup(dimension)) {
+ seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null
? 0 : dimValueLookup.size());
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ return seekers;
+ }
- final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
- builder.setValueType(ValueType.STRING);
- builder.setHasMultipleValues(hasMultiValue);
- final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder =
DictionaryEncodedColumnPartSerde
- .serializerBuilder()
- .withDictionary(dictionaryWriter)
- .withValue(
- encodedValueSerializer,
- hasMultiValue,
- compressionStrategy != CompressionStrategy.UNCOMPRESSED
- )
- .withBitmapSerdeFactory(bitmapSerdeFactory)
- .withBitmapIndex(bitmapWriter)
- .withSpatialIndex(spatialWriter)
- .withByteOrder(IndexIO.BYTE_ORDER);
-
- return builder
- .addSerde(partBuilder.build())
- .build();
+ private boolean allNull(Indexed<T> dimValues)
+ {
+ for (int i = 0, size = dimValues.size(); i < size; i++) {
+ if (dimValues.get(i) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static IndexedInts getRow(ColumnValueSelector s)
+ {
+ if (s instanceof DimensionSelector) {
+ return ((DimensionSelector) s).getRow();
+ } else if (s instanceof NilColumnValueSelector) {
+ return IndexedInts.empty();
+ } else {
+ throw new ISE(
+ "ColumnValueSelector[%s], only DimensionSelector or
NilColumnValueSelector is supported",
+ s.getClass()
+ );
+ }
+ }
+
+ private static boolean isNullRow(IndexedInts row, int size)
+ {
+ for (int i = 0; i < size; i++) {
+ if (row.get(i) != 0) {
+ return false;
+ }
+ }
+ return true;
}
protected interface IndexSeeker
@@ -677,36 +674,21 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
}
}
- protected IndexSeeker[] toIndexSeekers(
- List<IndexableAdapter> adapters,
- ArrayList<IntBuffer> dimConversions,
- String dimension
- )
+ /**
+ * Specifies any additional per value indexes which should be constructed
when
+ * {@link DictionaryEncodedColumnMerger#writeIndexes(List)} is called, on
top of the standard bitmap index created
+ * with {@link DictionaryEncodedColumnMerger#mergeBitmaps}
+ */
+ interface ExtendedIndexesMerger
{
- IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
- for (int i = 0; i < adapters.size(); i++) {
- IntBuffer dimConversion = dimConversions.get(i);
- if (dimConversion != null) {
- seekers[i] = new IndexSeekerWithConversion((IntBuffer)
dimConversion.asReadOnlyBuffer().rewind());
- } else {
- try (CloseableIndexed<String> dimValueLookup =
adapters.get(i).getDimValueLookup(dimension)) {
- seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null
? 0 : dimValueLookup.size());
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
- return seekers;
- }
+ void initialize() throws IOException;
- private boolean allNull(Indexed<String> dimValues)
- {
- for (int i = 0, size = dimValues.size(); i < size; i++) {
- if (dimValues.get(i) != null) {
- return false;
- }
- }
- return true;
+ /**
+ * Merge extended indexes for the given dictionaryId value. The merged
bitmap index from
+ * {@link DictionaryEncodedColumnMerger#mergeBitmaps} is supplied should
it be useful for the construction of
+ * the extended indexes.
+ */
+ void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws
IOException;
+ void write() throws IOException;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
b/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
new file mode 100644
index 0000000..56950d8
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.data.Indexed;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator for merging dictionaries for some comparable type into a single
sorted dictionary, useful when merging
+ * dictionary encoded columns
+ */
+public class DictionaryMergingIterator<T extends Comparable<T>> implements
CloseableIterator<T>
+{
+ private static final Logger log = new
Logger(DictionaryMergingIterator.class);
+
+ public static <T extends Comparable<T>> Comparator<Pair<Integer,
PeekingIterator<T>>> makePeekingComparator()
+ {
+ return (lhs, rhs) -> {
+ T left = lhs.rhs.peek();
+ T right = rhs.rhs.peek();
+ if (left == null) {
+ //noinspection VariableNotUsedInsideIf
+ return right == null ? 0 : -1;
+ } else if (right == null) {
+ return 1;
+ } else {
+ return left.compareTo(right);
+ }
+ };
+ }
+
+ protected final IntBuffer[] conversions;
+ protected final List<Pair<ByteBuffer, Integer>> directBufferAllocations =
new ArrayList<>();
+ protected final PriorityQueue<Pair<Integer, PeekingIterator<T>>> pQueue;
+
+ protected int counter;
+
+ public DictionaryMergingIterator(
+ Indexed<T>[] dimValueLookups,
+ Comparator<Pair<Integer, PeekingIterator<T>>> comparator,
+ boolean useDirect
+ )
+ {
+ pQueue = new PriorityQueue<>(dimValueLookups.length, comparator);
+ conversions = new IntBuffer[dimValueLookups.length];
+
+ long mergeBufferTotalSize = 0;
+ for (int i = 0; i < conversions.length; i++) {
+ if (dimValueLookups[i] == null) {
+ continue;
+ }
+ Indexed<T> indexed = dimValueLookups[i];
+ if (useDirect) {
+ int allocationSize = indexed.size() * Integer.BYTES;
+ log.trace("Allocating dictionary merging direct buffer with
size[%,d]", allocationSize);
+ mergeBufferTotalSize += allocationSize;
+ final ByteBuffer conversionDirectBuffer =
ByteBuffer.allocateDirect(allocationSize);
+ conversions[i] = conversionDirectBuffer.asIntBuffer();
+ directBufferAllocations.add(new Pair<>(conversionDirectBuffer,
allocationSize));
+ } else {
+ conversions[i] = IntBuffer.allocate(indexed.size());
+ mergeBufferTotalSize += indexed.size();
+ }
+
+ final PeekingIterator<T> iter = transformIndexedIterator(indexed);
+ if (iter.hasNext()) {
+ pQueue.add(Pair.of(i, iter));
+ }
+ }
+ log.debug("Allocated [%,d] bytes of dictionary merging direct buffers",
mergeBufferTotalSize);
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !pQueue.isEmpty();
+ }
+
+ @Override
+ public T next()
+ {
+ Pair<Integer, PeekingIterator<T>> smallest = pQueue.remove();
+ if (smallest == null) {
+ throw new NoSuchElementException();
+ }
+ final T value = writeTranslate(smallest, counter);
+
+ while (!pQueue.isEmpty() && Objects.equals(value,
pQueue.peek().rhs.peek())) {
+ writeTranslate(pQueue.remove(), counter);
+ }
+ counter++;
+
+ return value;
+ }
+
+ protected PeekingIterator<T> transformIndexedIterator(Indexed<T> indexed)
+ {
+ return Iterators.peekingIterator(
+ indexed.iterator()
+ );
+ }
+
+ protected boolean needConversion(int index)
+ {
+ IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
+ readOnly.rewind();
+ int i = 0;
+ while (readOnly.hasRemaining()) {
+ if (i != readOnly.get()) {
+ return true;
+ }
+ i++;
+ }
+ return false;
+ }
+
+ protected T writeTranslate(Pair<Integer, PeekingIterator<T>> smallest, int
counter)
+ {
+ final int index = smallest.lhs;
+ final T value = smallest.rhs.next();
+
+ conversions[index].put(counter);
+ if (smallest.rhs.hasNext()) {
+ pQueue.add(smallest);
+ }
+ return value;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ @Override
+ public void close()
+ {
+ long mergeBufferTotalSize = 0;
+ for (Pair<ByteBuffer, Integer> bufferAllocation : directBufferAllocations)
{
+ log.trace("Freeing dictionary merging direct buffer with size[%,d]",
bufferAllocation.rhs);
+ mergeBufferTotalSize += bufferAllocation.rhs;
+ ByteBufferUtils.free(bufferAllocation.lhs);
+ }
+ log.debug("Freed [%,d] bytes of dictionary merging direct buffers",
mergeBufferTotalSize);
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
new file mode 100644
index 0000000..345cd7b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Buildable dictionary for some comparable type. Values are unsorted, or
rather sorted in the order which they are
+ * added. A {@link SortedDimensionDictionary} can be constructed with a
mapping of ids from this dictionary to the
+ * sorted dictionary with the {@link #sort()} method.
+ *
+ * This dictionary is thread-safe.
+ */
+public class DimensionDictionary<T extends Comparable<T>>
+{
+ public static final int ABSENT_VALUE_ID = -1;
+
+ @Nullable
+ private T minValue = null;
+ @Nullable
+ private T maxValue = null;
+ private volatile int idForNull = ABSENT_VALUE_ID;
+
+ private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
+
+ private final List<T> idToValue = new ArrayList<>();
+ private final ReentrantReadWriteLock lock;
+
+ public DimensionDictionary()
+ {
+ this.lock = new ReentrantReadWriteLock();
+ valueToId.defaultReturnValue(ABSENT_VALUE_ID);
+ }
+
+ public int getId(@Nullable T value)
+ {
+ lock.readLock().lock();
+ try {
+ if (value == null) {
+ return idForNull;
+ }
+ return valueToId.getInt(value);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Nullable
+ public T getValue(int id)
+ {
+ lock.readLock().lock();
+ try {
+ if (id == idForNull) {
+ return null;
+ }
+ return idToValue.get(id);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public int size()
+ {
+ lock.readLock().lock();
+ try {
+ // using idToValue rather than valueToId because the valueToId doesn't
account null value, if it is present.
+ return idToValue.size();
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public int add(@Nullable T originalValue)
+ {
+ lock.writeLock().lock();
+ try {
+ if (originalValue == null) {
+ if (idForNull == ABSENT_VALUE_ID) {
+ idForNull = idToValue.size();
+ idToValue.add(null);
+ }
+ return idForNull;
+ }
+ int prev = valueToId.getInt(originalValue);
+ if (prev >= 0) {
+ return prev;
+ }
+ final int index = idToValue.size();
+ valueToId.put(originalValue, index);
+ idToValue.add(originalValue);
+ minValue = minValue == null || minValue.compareTo(originalValue) > 0 ?
originalValue : minValue;
+ maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ?
originalValue : maxValue;
+ return index;
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public T getMinValue()
+ {
+ lock.readLock().lock();
+ try {
+ return minValue;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public T getMaxValue()
+ {
+ lock.readLock().lock();
+ try {
+ return maxValue;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public int getIdForNull()
+ {
+ return idForNull;
+ }
+
+ public SortedDimensionDictionary<T> sort()
+ {
+ lock.readLock().lock();
+ try {
+ return new SortedDimensionDictionary<T>(idToValue, idToValue.size());
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
index 3876bf1..991cbdd 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
@@ -22,21 +22,13 @@ package org.apache.druid.segment;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.collect.PeekingIterator;
import com.google.inject.ImplementedBy;
-import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.utils.SerializerUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.java.util.common.ByteBufferUtils;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.utils.CollectionUtils;
@@ -45,15 +37,9 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -292,70 +278,6 @@ public interface IndexMerger
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;
- interface IndexSeeker
- {
- int NOT_EXIST = -1;
- int NOT_INIT = -1;
-
- int seek(int dictId);
- }
-
- /**
- * Get old dictId from new dictId, and only support access in order
- */
- class IndexSeekerWithConversion implements IndexSeeker
- {
- private final IntBuffer dimConversions;
- private int currIndex;
- private int currVal;
- private int lastVal;
-
- IndexSeekerWithConversion(IntBuffer dimConversions)
- {
- this.dimConversions = dimConversions;
- this.currIndex = 0;
- this.currVal = IndexSeeker.NOT_INIT;
- this.lastVal = IndexSeeker.NOT_INIT;
- }
-
- @Override
- public int seek(int dictId)
- {
- if (dimConversions == null) {
- return IndexSeeker.NOT_EXIST;
- }
- if (lastVal != IndexSeeker.NOT_INIT) {
- if (dictId <= lastVal) {
- throw new ISE(
- "Value dictId[%d] is less than the last value dictId[%d] I have,
cannot be.",
- dictId, lastVal
- );
- }
- return IndexSeeker.NOT_EXIST;
- }
- if (currVal == IndexSeeker.NOT_INIT) {
- currVal = dimConversions.get();
- }
- if (currVal == dictId) {
- int ret = currIndex;
- ++currIndex;
- if (dimConversions.hasRemaining()) {
- currVal = dimConversions.get();
- } else {
- lastVal = dictId;
- }
- return ret;
- } else if (currVal < dictId) {
- throw new ISE(
- "Skipped currValue dictId[%d], currIndex[%d]; incoming value
dictId[%d]",
- currVal, currIndex, dictId
- );
- } else {
- return IndexSeeker.NOT_EXIST;
- }
- }
- }
-
/**
* This method applies {@link
DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues(int,
ColumnValueSelector)} to
* all dimension column selectors of the given sourceRowIterator, using the
given index number.
@@ -423,133 +345,4 @@ public interface IndexMerger
}
};
}
-
- class DictionaryMergeIterator implements CloseableIterator<String>
- {
- /**
- * Don't replace this lambda with {@link Comparator#comparing} or {@link
Comparators#naturalNullsFirst()} because
- * this comparator is hot, so we want to avoid extra indirection layers.
- */
- static final Comparator<Pair<Integer, PeekingIterator<String>>>
NULLS_FIRST_PEEKING_COMPARATOR = (lhs, rhs) -> {
- String left = lhs.rhs.peek();
- String right = rhs.rhs.peek();
- if (left == null) {
- //noinspection VariableNotUsedInsideIf
- return right == null ? 0 : -1;
- } else if (right == null) {
- return 1;
- } else {
- return left.compareTo(right);
- }
- };
-
- protected final IntBuffer[] conversions;
- protected final List<Pair<ByteBuffer, Integer>> directBufferAllocations =
new ArrayList<>();
- protected final PriorityQueue<Pair<Integer, PeekingIterator<String>>>
pQueue;
-
- protected int counter;
-
- DictionaryMergeIterator(Indexed<String>[] dimValueLookups, boolean
useDirect)
- {
- pQueue = new PriorityQueue<>(dimValueLookups.length,
NULLS_FIRST_PEEKING_COMPARATOR);
- conversions = new IntBuffer[dimValueLookups.length];
-
- long mergeBufferTotalSize = 0;
- for (int i = 0; i < conversions.length; i++) {
- if (dimValueLookups[i] == null) {
- continue;
- }
- Indexed<String> indexed = dimValueLookups[i];
- if (useDirect) {
- int allocationSize = indexed.size() * Integer.BYTES;
- log.trace("Allocating dictionary merging direct buffer with
size[%,d]", allocationSize);
- mergeBufferTotalSize += allocationSize;
- final ByteBuffer conversionDirectBuffer =
ByteBuffer.allocateDirect(allocationSize);
- conversions[i] = conversionDirectBuffer.asIntBuffer();
- directBufferAllocations.add(new Pair<>(conversionDirectBuffer,
allocationSize));
- } else {
- conversions[i] = IntBuffer.allocate(indexed.size());
- mergeBufferTotalSize += indexed.size();
- }
-
- final PeekingIterator<String> iter = Iterators.peekingIterator(
- Iterators.transform(
- indexed.iterator(),
- NullHandling::nullToEmptyIfNeeded
- )
- );
- if (iter.hasNext()) {
- pQueue.add(Pair.of(i, iter));
- }
- }
- log.debug("Allocated [%,d] bytes of dictionary merging direct buffers",
mergeBufferTotalSize);
- }
-
- @Override
- public boolean hasNext()
- {
- return !pQueue.isEmpty();
- }
-
- @Override
- public String next()
- {
- Pair<Integer, PeekingIterator<String>> smallest = pQueue.remove();
- if (smallest == null) {
- throw new NoSuchElementException();
- }
- final String value = writeTranslate(smallest, counter);
-
- while (!pQueue.isEmpty() && Objects.equals(value,
pQueue.peek().rhs.peek())) {
- writeTranslate(pQueue.remove(), counter);
- }
- counter++;
-
- return value;
- }
-
- boolean needConversion(int index)
- {
- IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
- readOnly.rewind();
- int i = 0;
- while (readOnly.hasRemaining()) {
- if (i != readOnly.get()) {
- return true;
- }
- i++;
- }
- return false;
- }
-
- private String writeTranslate(Pair<Integer, PeekingIterator<String>>
smallest, int counter)
- {
- final int index = smallest.lhs;
- final String value = smallest.rhs.next();
-
- conversions[index].put(counter);
- if (smallest.rhs.hasNext()) {
- pQueue.add(smallest);
- }
- return value;
- }
-
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException("remove");
- }
-
- @Override
- public void close()
- {
- long mergeBufferTotalSize = 0;
- for (Pair<ByteBuffer, Integer> bufferAllocation :
directBufferAllocations) {
- log.trace("Freeing dictionary merging direct buffer with size[%,d]",
bufferAllocation.rhs);
- mergeBufferTotalSize += bufferAllocation.rhs;
- ByteBufferUtils.free(bufferAllocation.lhs);
- }
- log.debug("Freed [%,d] bytes of dictionary merging direct buffers",
mergeBufferTotalSize);
- }
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
b/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
new file mode 100644
index 0000000..934f494
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
+import it.unimi.dsi.fastutil.objects.Object2IntSortedMap;
+import org.apache.druid.java.util.common.guava.Comparators;
+
+import java.util.List;
+
+/**
+ * Creates a sorted dictionary given some existing dictionary, storing a
mapping of both sorted id to unsorted id
+ * and unsorted id to sorted id
+ */
+public class SortedDimensionDictionary<T extends Comparable<T>>
+{
+ private final List<T> sortedVals;
+ private final int[] idToIndex;
+ private final int[] indexToId;
+
+ public SortedDimensionDictionary(List<T> idToValue, int length)
+ {
+ Object2IntSortedMap<T> sortedMap = new
Object2IntRBTreeMap<>(Comparators.naturalNullsFirst());
+ for (int id = 0; id < length; id++) {
+ T value = idToValue.get(id);
+ sortedMap.put(value, id);
+ }
+ this.sortedVals = Lists.newArrayList(sortedMap.keySet());
+ this.idToIndex = new int[length];
+ this.indexToId = new int[length];
+ int index = 0;
+ for (IntIterator iterator = sortedMap.values().iterator();
iterator.hasNext(); ) {
+ int id = iterator.nextInt();
+ idToIndex[id] = index;
+ indexToId[index] = id;
+ index++;
+ }
+ }
+
+ public int getUnsortedIdFromSortedId(int index)
+ {
+ return indexToId[index];
+ }
+
+ public int getSortedIdFromUnsortedId(int id)
+ {
+ return idToIndex[id];
+ }
+
+ public T getValueFromSortedId(int index)
+ {
+ return sortedVals.get(index);
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index f2b4647..108db6e 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -21,21 +21,14 @@ package org.apache.druid.segment;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.IntArrays;
-import it.unimi.dsi.fastutil.ints.IntIterator;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
-import it.unimi.dsi.fastutil.objects.Object2IntSortedMap;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.ValueMatcher;
@@ -44,9 +37,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
-import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.IndexedInts;
-import org.apache.druid.segment.data.IndexedIterable;
import org.apache.druid.segment.filter.BooleanValueMatcher;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexRow;
@@ -54,199 +45,25 @@ import
org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class StringDimensionIndexer implements DimensionIndexer<Integer,
int[], String>
+public class StringDimensionIndexer extends
DictionaryEncodedColumnIndexer<int[], String>
{
-
@Nullable
private static String emptyToNullIfNeeded(@Nullable Object o)
{
return o != null ? NullHandling.emptyToNullIfNeeded(o.toString()) : null;
}
- private static final int ABSENT_VALUE_ID = -1;
-
- private static class DimensionDictionary
- {
- @Nullable
- private String minValue = null;
- @Nullable
- private String maxValue = null;
- private volatile int idForNull = ABSENT_VALUE_ID;
-
- private final Object2IntMap<String> valueToId = new
Object2IntOpenHashMap<>();
-
- private final List<String> idToValue = new ArrayList<>();
- private final ReentrantReadWriteLock lock;
-
- public DimensionDictionary()
- {
- this.lock = new ReentrantReadWriteLock();
- valueToId.defaultReturnValue(ABSENT_VALUE_ID);
- }
-
- public int getId(@Nullable String value)
- {
- lock.readLock().lock();
- try {
- if (value == null) {
- return idForNull;
- }
- return valueToId.getInt(value);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- @Nullable
- public String getValue(int id)
- {
- lock.readLock().lock();
- try {
- if (id == idForNull) {
- return null;
- }
- return idToValue.get(id);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- public int size()
- {
- lock.readLock().lock();
- try {
- // using idToValue rather than valueToId because the valueToId doesn't
account null value, if it is present.
- return idToValue.size();
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- public int add(@Nullable String originalValue)
- {
- lock.writeLock().lock();
- try {
- if (originalValue == null) {
- if (idForNull == ABSENT_VALUE_ID) {
- idForNull = idToValue.size();
- idToValue.add(null);
- }
- return idForNull;
- }
- int prev = valueToId.getInt(originalValue);
- if (prev >= 0) {
- return prev;
- }
- final int index = idToValue.size();
- valueToId.put(originalValue, index);
- idToValue.add(originalValue);
- minValue = minValue == null || minValue.compareTo(originalValue) > 0 ?
originalValue : minValue;
- maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ?
originalValue : maxValue;
- return index;
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- public String getMinValue()
- {
- lock.readLock().lock();
- try {
- return minValue;
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- public String getMaxValue()
- {
- lock.readLock().lock();
- try {
- return maxValue;
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- public SortedDimensionDictionary sort()
- {
- lock.readLock().lock();
- try {
- return new SortedDimensionDictionary(idToValue, idToValue.size());
- }
- finally {
- lock.readLock().unlock();
- }
- }
- }
-
- private static class SortedDimensionDictionary
- {
- private final List<String> sortedVals;
- private final int[] idToIndex;
- private final int[] indexToId;
-
- public SortedDimensionDictionary(List<String> idToValue, int length)
- {
- Object2IntSortedMap<String> sortedMap = new
Object2IntRBTreeMap<>(Comparators.naturalNullsFirst());
- for (int id = 0; id < length; id++) {
- String value = idToValue.get(id);
- sortedMap.put(value, id);
- }
- this.sortedVals = Lists.newArrayList(sortedMap.keySet());
- this.idToIndex = new int[length];
- this.indexToId = new int[length];
- int index = 0;
- for (IntIterator iterator = sortedMap.values().iterator();
iterator.hasNext(); ) {
- int id = iterator.nextInt();
- idToIndex[id] = index;
- indexToId[index] = id;
- index++;
- }
- }
-
- public int getUnsortedIdFromSortedId(int index)
- {
- return indexToId[index];
- }
-
- public int getSortedIdFromUnsortedId(int id)
- {
- return idToIndex[id];
- }
-
- public String getValueFromSortedId(int index)
- {
- return sortedVals.get(index);
- }
- }
-
- private final DimensionDictionary dimLookup;
private final MultiValueHandling multiValueHandling;
private final boolean hasBitmapIndexes;
private final boolean hasSpatialIndexes;
private volatile boolean hasMultipleValues = false;
- private volatile boolean isSparse = false;
-
- @Nullable
- private SortedDimensionDictionary sortedLookup;
public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean
hasBitmapIndexes, boolean hasSpatialIndexes)
{
- this.dimLookup = new DimensionDictionary();
this.multiValueHandling = multiValueHandling == null ?
MultiValueHandling.ofDefault() : multiValueHandling;
this.hasBitmapIndexes = hasBitmapIndexes;
this.hasSpatialIndexes = hasSpatialIndexes;
@@ -260,7 +77,7 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
if (dimValues == null) {
final int nullId = dimLookup.getId(null);
- encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new
int[]{dimLookup.add(null)} : new int[]{nullId};
+ encodedDimensionValues = nullId == DimensionDictionary.ABSENT_VALUE_ID ?
new int[]{dimLookup.add(null)} : new int[]{nullId};
} else if (dimValues instanceof List) {
List<Object> dimValuesList = (List<Object>) dimValues;
if (dimValuesList.isEmpty()) {
@@ -309,12 +126,6 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
}
@Override
- public void setSparseIndexed()
- {
- isSparse = true;
- }
-
- @Override
public long estimateEncodedKeyComponentSize(int[] key)
{
// string length is being accounted for each time they are referenced,
based on dimension handler interface,
@@ -336,91 +147,6 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
return estimatedSize;
}
- public Integer getSortedEncodedValueFromUnsorted(Integer
unsortedIntermediateValue)
- {
- return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);
- }
-
- @Override
- public Integer getUnsortedEncodedValueFromSorted(Integer
sortedIntermediateValue)
- {
- return sortedLookup().getUnsortedIdFromSortedId(sortedIntermediateValue);
- }
-
- @Override
- public CloseableIndexed<String> getSortedIndexedValues()
- {
- return new CloseableIndexed<String>()
- {
-
- @Override
- public int size()
- {
- return getCardinality();
- }
-
- @Override
- public String get(int index)
- {
- return getActualValue(index, true);
- }
-
- @Override
- public int indexOf(String value)
- {
- int id = getEncodedValue(value, false);
- return id < 0 ? ABSENT_VALUE_ID :
getSortedEncodedValueFromUnsorted(id);
- }
-
- @Override
- public Iterator<String> iterator()
- {
- return IndexedIterable.create(this).iterator();
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- // nothing to inspect
- }
-
- @Override
- public void close()
- {
- // nothing to close
- }
- };
- }
-
- @Override
- public String getMinValue()
- {
- return dimLookup.getMinValue();
- }
-
- @Override
- public String getMaxValue()
- {
- return dimLookup.getMaxValue();
- }
-
- @Override
- public int getCardinality()
- {
- return dimLookup.size();
- }
-
- /**
- * returns true if all values are encoded in {@link #dimLookup}
- */
- private boolean dictionaryEncodesAllValues()
- {
- // name lookup is possible in advance if we explicitly process a value for
every row, or if we've encountered an
- // actual null value and it is present in our dictionary. otherwise the
dictionary will be missing ids for implicit
- // null values
- return !isSparse || dimLookup.idForNull != ABSENT_VALUE_ID;
- }
-
@Override
public int compareUnsortedEncodedKeyComponents(int[] lhs, int[] rhs)
{
@@ -432,7 +158,7 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
// if the values don't have the same length, check if we're comparing []
and [null], which are equivalent
if (lhsLen + rhsLen == 1) {
int[] longerVal = rhsLen > lhsLen ? rhs : lhs;
- if (longerVal[0] == dimLookup.idForNull) {
+ if (longerVal[0] == dimLookup.getIdForNull()) {
return 0;
} else {
//noinspection ArrayEquality -- longerVal is explicitly set to only
lhs or rhs
@@ -506,7 +232,7 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
capabilites.setDictionaryEncoded(true);
}
- if (isSparse || dimLookup.idForNull != ABSENT_VALUE_ID) {
+ if (isSparse || dimLookup.getIdForNull() !=
DimensionDictionary.ABSENT_VALUE_ID) {
capabilites.setHasNulls(true);
}
return capabilites;
@@ -724,7 +450,7 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
} else {
// Can happen if a value was added to our dimLookup after this
selector was created. Act like it
// doesn't exist.
- return ABSENT_VALUE_ID;
+ return DimensionDictionary.ABSENT_VALUE_ID;
}
}
@@ -762,15 +488,6 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
return new IndexerDimensionSelector();
}
- @Override
- public ColumnValueSelector<?> makeColumnValueSelector(
- IncrementalIndexRowHolder currEntry,
- IncrementalIndex.DimensionDesc desc
- )
- {
- return makeDimensionSelector(DefaultDimensionSpec.of(desc.getName()),
currEntry, desc);
- }
-
@Nullable
@Override
@@ -792,90 +509,6 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
}
@Override
- public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector
selectorWithUnsortedValues)
- {
- DimensionSelector dimSelectorWithUnsortedValues = (DimensionSelector)
selectorWithUnsortedValues;
- class SortedDimensionSelector implements DimensionSelector, IndexedInts
- {
- @Override
- public int size()
- {
- return dimSelectorWithUnsortedValues.getRow().size();
- }
-
- @Override
- public int get(int index)
- {
- return
sortedLookup().getSortedIdFromUnsortedId(dimSelectorWithUnsortedValues.getRow().get(index));
- }
-
- @Override
- public IndexedInts getRow()
- {
- return this;
- }
-
- @Override
- public ValueMatcher makeValueMatcher(@Nullable String value)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ValueMatcher makeValueMatcher(Predicate<String> predicate)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getValueCardinality()
- {
- return dimSelectorWithUnsortedValues.getValueCardinality();
- }
-
- @Nullable
- @Override
- public String lookupName(int id)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean nameLookupPossibleInAdvance()
- {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- public IdLookup idLookup()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("dimSelectorWithUnsortedValues",
dimSelectorWithUnsortedValues);
- }
-
- @Nullable
- @Override
- public Object getObject()
- {
- return dimSelectorWithUnsortedValues.getObject();
- }
-
- @Override
- public Class classOfObject()
- {
- return dimSelectorWithUnsortedValues.classOfObject();
- }
- }
- return new SortedDimensionSelector();
- }
-
- @Override
public void fillBitmapsFromUnsortedEncodedKeyComponent(
int[] key,
int rowNum,
@@ -894,31 +527,4 @@ public class StringDimensionIndexer implements
DimensionIndexer<Integer, int[],
bitmapIndexes[dimValIdx].add(rowNum);
}
}
-
- private SortedDimensionDictionary sortedLookup()
- {
- return sortedLookup == null ? sortedLookup = dimLookup.sort() :
sortedLookup;
- }
-
- @Nullable
- private String getActualValue(int intermediateValue, boolean idSorted)
- {
- if (idSorted) {
- return sortedLookup().getValueFromSortedId(intermediateValue);
- } else {
- return dimLookup.getValue(intermediateValue);
-
- }
- }
-
- private int getEncodedValue(String fullValue, boolean idSorted)
- {
- int unsortedId = dimLookup.getId(fullValue);
-
- if (idSorted) {
- return sortedLookup().getSortedIdFromUnsortedId(unsortedId);
- } else {
- return unsortedId;
- }
- }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index abb4637..e4e0fbd 100644
---
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -19,94 +19,47 @@
package org.apache.druid.segment;
-import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
-import it.unimi.dsi.fastutil.ints.IntIterable;
-import it.unimi.dsi.fastutil.ints.IntIterator;
+import com.google.common.collect.PeekingIterator;
import org.apache.druid.collections.bitmap.BitmapFactory;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.collections.spatial.ImmutableRTree;
import org.apache.druid.collections.spatial.RTree;
import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.filter.ValueMatcher;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.BitmapSerdeFactory;
-import org.apache.druid.segment.data.BitmapValues;
import org.apache.druid.segment.data.ByteBufferWriter;
-import org.apache.druid.segment.data.CloseableIndexed;
-import org.apache.druid.segment.data.ColumnarIntsSerializer;
-import org.apache.druid.segment.data.ColumnarMultiIntsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.GenericIndexed;
-import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
import org.apache.druid.segment.data.Indexed;
-import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.ListIndexed;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
-import
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
+import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
-public class StringDimensionMergerV9 implements DimensionMergerV9
+public class StringDimensionMergerV9 extends
DictionaryEncodedColumnMerger<String>
{
- private static final Logger log = new Logger(StringDimensionMergerV9.class);
-
private static final Indexed<String> NULL_STR_DIM_VAL = new
ListIndexed<>(Collections.singletonList(null));
private static final Splitter SPLITTER = Splitter.on(",");
- private final String dimensionName;
- private final ProgressIndicator progress;
- private final Closer closer;
- private final IndexSpec indexSpec;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final MutableBitmap nullRowsBitmap;
- private final ColumnCapabilities capabilities;
-
- private int dictionarySize;
- private int rowCount = 0;
- private int cardinality = 0;
- private boolean hasNull = false;
+ public static final Comparator<Pair<Integer, PeekingIterator<String>>>
DICTIONARY_MERGING_COMPARATOR =
+ DictionaryMergingIterator.makePeekingComparator();
@Nullable
- private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
- @Nullable
private ByteBufferWriter<ImmutableRTree> spatialWriter;
- @Nullable
- private ArrayList<IntBuffer> dimConversions;
- @Nullable
- private List<IndexableAdapter> adapters;
- @Nullable
- private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
- @Nullable
- private ColumnarIntsSerializer encodedValueSerializer;
- @Nullable
- private GenericIndexedWriter<String> dictionaryWriter;
- @Nullable
- private String firstDictionaryValue;
-
public StringDimensionMergerV9(
String dimensionName,
@@ -117,416 +70,38 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
Closer closer
)
{
- this.dimensionName = dimensionName;
- this.indexSpec = indexSpec;
- this.capabilities = capabilities;
- this.segmentWriteOutMedium = segmentWriteOutMedium;
- nullRowsBitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-
- this.progress = progress;
- this.closer = closer;
+ super(dimensionName, indexSpec, segmentWriteOutMedium, capabilities,
progress, closer);
}
@Override
- public void writeMergedValueDictionary(List<IndexableAdapter> adapters)
throws IOException
+ protected Comparator<Pair<Integer, PeekingIterator<String>>>
getDictionaryMergingComparator()
{
- boolean dimHasValues = false;
- boolean dimAbsentFromSomeIndex = false;
-
- long dimStartTime = System.currentTimeMillis();
-
- this.adapters = adapters;
-
- dimConversions = Lists.newArrayListWithCapacity(adapters.size());
- for (int i = 0; i < adapters.size(); ++i) {
- dimConversions.add(null);
- }
-
- int numMergeIndex = 0;
- Indexed<String> dimValueLookup = null;
- Indexed<String>[] dimValueLookups = new Indexed[adapters.size() + 1];
- for (int i = 0; i < adapters.size(); i++) {
- @SuppressWarnings("MustBeClosedChecker") // we register dimValues in the
closer
- Indexed<String> dimValues =
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
- if (dimValues != null && !allNull(dimValues)) {
- dimHasValues = true;
- hasNull |= dimValues.indexOf(null) >= 0;
- dimValueLookups[i] = dimValueLookup = dimValues;
- numMergeIndex++;
- } else {
- dimAbsentFromSomeIndex = true;
- }
- }
-
- boolean convertMissingValues = dimHasValues && dimAbsentFromSomeIndex;
-
- /*
- * Ensure the empty str is always in the dictionary if the dimension was
missing from one index but
- * has non-null values in another index.
- * This is done so that IndexMerger.toMergedIndexRowIterator() can convert
null columns to empty strings
- * later on, to allow rows from indexes without a particular dimension to
merge correctly with
- * rows from indexes with null/empty str values for that dimension.
- */
- if (convertMissingValues && !hasNull) {
- hasNull = true;
- dimValueLookups[adapters.size()] = dimValueLookup = NULL_STR_DIM_VAL;
- numMergeIndex++;
- }
-
- String dictFilename = StringUtils.format("%s.dim_values", dimensionName);
- dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium,
dictFilename, GenericIndexed.STRING_STRATEGY);
- firstDictionaryValue = null;
- dictionarySize = 0;
- dictionaryWriter.open();
-
- cardinality = 0;
- if (numMergeIndex > 1) {
- dictionaryMergeIterator = new
IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
- writeDictionary(() -> dictionaryMergeIterator);
- for (int i = 0; i < adapters.size(); i++) {
- if (dimValueLookups[i] != null &&
dictionaryMergeIterator.needConversion(i)) {
- dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
- }
- }
- cardinality = dictionaryMergeIterator.counter;
- } else if (numMergeIndex == 1) {
- writeDictionary(dimValueLookup);
- cardinality = dimValueLookup.size();
- }
-
- log.debug(
- "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
- dimensionName,
- cardinality,
- System.currentTimeMillis() - dimStartTime
- );
-
- setupEncodedValueWriter();
- }
-
- private void writeDictionary(Iterable<String> dictionaryValues) throws
IOException
- {
- for (String value : dictionaryValues) {
- dictionaryWriter.write(value);
- value = NullHandling.emptyToNullIfNeeded(value);
- if (dictionarySize == 0) {
- firstDictionaryValue = value;
- }
- dictionarySize++;
- }
- }
-
- protected void setupEncodedValueWriter() throws IOException
- {
- final CompressionStrategy compressionStrategy =
indexSpec.getDimensionCompression();
-
- String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
- if (capabilities.hasMultipleValues().isTrue()) {
- if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
- encodedValueSerializer =
V3CompressedVSizeColumnarMultiIntsSerializer.create(
- dimensionName,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionStrategy
- );
- } else {
- encodedValueSerializer =
- new VSizeColumnarMultiIntsSerializer(dimensionName,
segmentWriteOutMedium, cardinality);
- }
- } else {
- if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- dimensionName,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionStrategy
- );
- } else {
- encodedValueSerializer = new
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
- }
- }
- encodedValueSerializer.open();
+ return DICTIONARY_MERGING_COMPARATOR;
}
@Override
- public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
- int segmentIndex,
- ColumnValueSelector source
- )
+ protected Indexed<String> getNullDimValue()
{
- IntBuffer converter = dimConversions.get(segmentIndex);
- if (converter == null) {
- return source;
- }
- DimensionSelector sourceDimensionSelector = (DimensionSelector) source;
-
- IndexedInts convertedRow = new IndexedInts()
- {
- @Override
- public int size()
- {
- return sourceDimensionSelector.getRow().size();
- }
-
- @Override
- public int get(int index)
- {
- return converter.get(sourceDimensionSelector.getRow().get(index));
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("source", source);
- inspector.visit("converter", converter);
- }
- };
- return new DimensionSelector()
- {
- @Override
- public IndexedInts getRow()
- {
- return convertedRow;
- }
-
- @Override
- public void inspectRuntimeShape(RuntimeShapeInspector inspector)
- {
- inspector.visit("convertedRow", convertedRow);
- }
-
- @Override
- public ValueMatcher makeValueMatcher(String value)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ValueMatcher makeValueMatcher(Predicate<String> predicate)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getValueCardinality()
- {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- public String lookupName(int id)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean nameLookupPossibleInAdvance()
- {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- public IdLookup idLookup()
- {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- public Object getObject()
- {
- return sourceDimensionSelector.getObject();
- }
-
- @Override
- public Class classOfObject()
- {
- return sourceDimensionSelector.classOfObject();
- }
- };
+ return NULL_STR_DIM_VAL;
}
@Override
- public void processMergedRow(ColumnValueSelector selector) throws IOException
- {
- IndexedInts row = getRow(selector);
- int rowSize = row.size();
- if (rowSize == 0) {
- nullRowsBitmap.add(rowCount);
- } else if (hasNull && isNullRow(row, rowSize)) {
- // If this dimension has the null/empty str in its dictionary, a row
with nulls at all positions should also be
- // added to nullRowBitmap.
- nullRowsBitmap.add(rowCount);
- }
- if (encodedValueSerializer instanceof ColumnarMultiIntsSerializer) {
- ((ColumnarMultiIntsSerializer) encodedValueSerializer).addValues(row);
- } else {
- int value = row.size() == 0 ? 0 : row.get(0);
- ((SingleValueColumnarIntsSerializer)
encodedValueSerializer).addValue(value);
- }
- rowCount++;
- }
-
- private static IndexedInts getRow(ColumnValueSelector s)
- {
- if (s instanceof DimensionSelector) {
- return ((DimensionSelector) s).getRow();
- } else if (s instanceof NilColumnValueSelector) {
- return IndexedInts.empty();
- } else {
- throw new ISE(
- "ColumnValueSelector[%s], only DimensionSelector or
NilColumnValueSelector is supported",
- s.getClass()
- );
- }
- }
-
- private static boolean isNullRow(IndexedInts row, int size)
+ protected ObjectStrategy<String> getObjectStrategy()
{
- for (int i = 0; i < size; i++) {
- if (row.get(i) != 0) {
- return false;
- }
- }
- return true;
+ return GenericIndexed.STRING_STRATEGY;
}
@Override
- public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions)
throws IOException
+ protected String coerceValue(String value)
{
- if (!capabilities.hasBitmapIndexes()) {
- return;
- }
-
- long dimStartTime = System.currentTimeMillis();
- final BitmapSerdeFactory bitmapSerdeFactory =
indexSpec.getBitmapSerdeFactory();
-
- String bmpFilename = StringUtils.format("%s.inverted", dimensionName);
- bitmapWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- bmpFilename,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapWriter.open();
- bitmapWriter.setObjectsNotSorted();
-
- BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
-
- RTree tree = null;
- boolean hasSpatial = capabilities.hasSpatialIndexes();
- if (hasSpatial) {
- spatialWriter = new ByteBufferWriter<>(
- segmentWriteOutMedium,
- new ImmutableRTreeObjectStrategy(bitmapFactory)
- );
- spatialWriter.open();
- tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory),
bitmapFactory);
- }
-
- IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions,
dimensionName);
-
- //Iterate all dim values's dictionary id in ascending order which in line
with dim values's compare result.
- for (int dictId = 0; dictId < dictionarySize; dictId++) {
- progress.progress();
- mergeBitmaps(
- segmentRowNumConversions,
- bitmapFactory,
- tree,
- hasSpatial,
- dictIdSeeker,
- dictId
- );
- }
-
- if (hasSpatial) {
- spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
- }
-
- log.debug(
- "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.",
- dimensionName,
- dictionarySize,
- System.currentTimeMillis() - dimStartTime
- );
-
- if (dictionaryMergeIterator != null) {
- dictionaryMergeIterator.close();
- }
- }
-
- void mergeBitmaps(
- @Nullable List<IntBuffer> segmentRowNumConversions,
- BitmapFactory bmpFactory,
- RTree tree,
- boolean hasSpatial,
- IndexSeeker[] dictIdSeeker,
- int dictId
- ) throws IOException
- {
- List<IntIterable> convertedInvertedIndexesToMerge =
Lists.newArrayListWithCapacity(adapters.size());
- for (int j = 0; j < adapters.size(); ++j) {
- int seekedDictId = dictIdSeeker[j].seek(dictId);
- if (seekedDictId != IndexSeeker.NOT_EXIST) {
- IntIterable values;
- if (segmentRowNumConversions != null) {
- values = new ConvertingBitmapValues(
- adapters.get(j).getBitmapValues(dimensionName, seekedDictId),
- segmentRowNumConversions.get(j)
- );
- } else {
- BitmapValues bitmapValues =
adapters.get(j).getBitmapValues(dimensionName, seekedDictId);
- values = bitmapValues::iterator;
- }
- convertedInvertedIndexesToMerge.add(values);
- }
- }
-
- MutableBitmap mergedIndexes = bmpFactory.makeEmptyMutableBitmap();
- List<IntIterator> convertedInvertedIndexesIterators = new
ArrayList<>(convertedInvertedIndexesToMerge.size());
- for (IntIterable convertedInvertedIndexes :
convertedInvertedIndexesToMerge) {
-
convertedInvertedIndexesIterators.add(convertedInvertedIndexes.iterator());
- }
-
- // Merge ascending index iterators into a single one, remove duplicates,
and add to the mergedIndexes bitmap.
- // Merge is needed, because some compacting MutableBitmap implementations
are very inefficient when bits are
- // added not in the ascending order.
- int prevRow = IndexMerger.INVALID_ROW;
- for (IntIterator mergeIt =
IntIteratorUtils.mergeAscending(convertedInvertedIndexesIterators);
- mergeIt.hasNext(); ) {
- int row = mergeIt.nextInt();
- if (row != prevRow && row != IndexMerger.INVALID_ROW) {
- mergedIndexes.add(row);
- }
- prevRow = row;
- }
-
- if (dictId == 0 && firstDictionaryValue == null) {
- mergedIndexes.or(nullRowsBitmap);
- }
-
- bitmapWriter.write(bmpFactory.makeImmutableBitmap(mergedIndexes));
-
- if (hasSpatial) {
- String dimVal = dictionaryWriter.get(dictId);
- if (dimVal != null) {
- List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
- float[] coords = new float[stringCoords.size()];
- for (int j = 0; j < coords.length; j++) {
- coords[j] = Float.valueOf(stringCoords.get(j));
- }
- tree.insert(coords, mergedIndexes);
- }
- }
+ return NullHandling.emptyToNullIfNeeded(value);
}
+ @Nullable
@Override
- public boolean canSkip()
+ protected ExtendedIndexesMerger getExtendedIndexesMerger()
{
- return cardinality == 0;
+ return new SpatialIndexesMerger();
}
@Override
@@ -558,155 +133,54 @@ public class StringDimensionMergerV9 implements
DimensionMergerV9
.build();
}
- protected interface IndexSeeker
- {
- int NOT_EXIST = -1;
- int NOT_INIT = -1;
-
- int seek(int dictId);
- }
-
- protected static class IndexSeekerWithoutConversion implements IndexSeeker
- {
- private final int limit;
-
- public IndexSeekerWithoutConversion(int limit)
- {
- this.limit = limit;
- }
-
- @Override
- public int seek(int dictId)
- {
- return dictId < limit ? dictId : NOT_EXIST;
- }
- }
-
/**
- * Get old dictId from new dictId, and only support access in order
+ * Write spatial indexes for string columns that have them
*/
- protected static class IndexSeekerWithConversion implements IndexSeeker
+ public class SpatialIndexesMerger implements ExtendedIndexesMerger
{
- private final IntBuffer dimConversions;
- private int currIndex;
- private int currVal;
- private int lastVal;
-
- IndexSeekerWithConversion(IntBuffer dimConversions)
- {
- this.dimConversions = dimConversions;
- this.currIndex = 0;
- this.currVal = NOT_INIT;
- this.lastVal = NOT_INIT;
- }
+ private RTree tree;
+ private final boolean hasSpatial = capabilities.hasSpatialIndexes();
@Override
- public int seek(int dictId)
+ public void initialize() throws IOException
{
- if (dimConversions == null) {
- return NOT_EXIST;
- }
- if (lastVal != NOT_INIT) {
- if (dictId <= lastVal) {
- throw new ISE(
- "Value dictId[%d] is less than the last value dictId[%d] I have,
cannot be.",
- dictId, lastVal
- );
- }
- return NOT_EXIST;
- }
- if (currVal == NOT_INIT) {
- currVal = dimConversions.get();
- }
- if (currVal == dictId) {
- int ret = currIndex;
- ++currIndex;
- if (dimConversions.hasRemaining()) {
- currVal = dimConversions.get();
- } else {
- lastVal = dictId;
- }
- return ret;
- } else if (currVal < dictId) {
- throw new ISE(
- "Skipped currValue dictId[%d], currIndex[%d]; incoming value
dictId[%d]",
- currVal, currIndex, dictId
+ BitmapFactory bitmapFactory =
indexSpec.getBitmapSerdeFactory().getBitmapFactory();
+ if (hasSpatial) {
+ spatialWriter = new ByteBufferWriter<>(
+ segmentWriteOutMedium,
+ new ImmutableRTreeObjectStrategy(bitmapFactory)
+ );
+ spatialWriter.open();
+ tree = new RTree(
+ 2,
+ new LinearGutmanSplitStrategy(0, 50, bitmapFactory),
+ bitmapFactory
);
- } else {
- return NOT_EXIST;
}
}
- }
- public static class ConvertingBitmapValues implements IntIterable
- {
- private final BitmapValues baseValues;
- private final IntBuffer conversionBuffer;
-
- ConvertingBitmapValues(BitmapValues baseValues, IntBuffer conversionBuffer)
- {
- this.baseValues = baseValues;
- this.conversionBuffer = conversionBuffer;
- }
-
- @Nonnull
@Override
- public IntIterator iterator()
+ public void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws
IOException
{
- final IntIterator baseIterator = baseValues.iterator();
- return new IntIterator()
- {
- @Override
- public boolean hasNext()
- {
- return baseIterator.hasNext();
- }
-
- @Override
- public int nextInt()
- {
- return conversionBuffer.get(baseIterator.nextInt());
- }
-
- @Override
- public int skip(int n)
- {
- return IntIteratorUtils.skip(baseIterator, n);
- }
- };
- }
- }
-
- protected IndexSeeker[] toIndexSeekers(
- List<IndexableAdapter> adapters,
- ArrayList<IntBuffer> dimConversions,
- String dimension
- )
- {
- IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
- for (int i = 0; i < adapters.size(); i++) {
- IntBuffer dimConversion = dimConversions.get(i);
- if (dimConversion != null) {
- seekers[i] = new IndexSeekerWithConversion((IntBuffer)
dimConversion.asReadOnlyBuffer().rewind());
- } else {
- try (CloseableIndexed<String> dimValueLookup =
adapters.get(i).getDimValueLookup(dimension)) {
- seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null
? 0 : dimValueLookup.size());
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
+ if (hasSpatial) {
+ String dimVal = dictionaryWriter.get(dictId);
+ if (dimVal != null) {
+ List<String> stringCoords =
Lists.newArrayList(SPLITTER.split(dimVal));
+ float[] coords = new float[stringCoords.size()];
+ for (int j = 0; j < coords.length; j++) {
+ coords[j] = Float.valueOf(stringCoords.get(j));
+ }
+ tree.insert(coords, mergedIndexes);
}
}
}
- return seekers;
- }
- private boolean allNull(Indexed<String> dimValues)
- {
- for (int i = 0, size = dimValues.size(); i < size; i++) {
- if (dimValues.get(i) != null) {
- return false;
+ @Override
+ public void write() throws IOException
+ {
+ if (hasSpatial) {
+ spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
}
}
- return true;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 178dd57..a85102d 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -199,7 +199,7 @@ public class GenericIndexed<T> implements
CloseableIndexed<T>, Serializer
return fromIterable(Arrays.asList(objects), strategy);
}
- static GenericIndexed<ResourceHolder<ByteBuffer>> ofCompressedByteBuffers(
+ public static GenericIndexed<ResourceHolder<ByteBuffer>>
ofCompressedByteBuffers(
Iterable<ByteBuffer> buffers,
CompressionStrategy compression,
int bufferSize,
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index d4254ddd..6db18d0 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -70,7 +70,7 @@ public class GenericIndexedWriter<T> implements Serializer
.writeByteArray(x -> x.fileNameByteArray);
- static GenericIndexedWriter<ByteBuffer> ofCompressedByteBuffers(
+ public static GenericIndexedWriter<ByteBuffer> ofCompressedByteBuffers(
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final CompressionStrategy compressionStrategy,
@@ -86,7 +86,7 @@ public class GenericIndexedWriter<T> implements Serializer
return writer;
}
- static ObjectStrategy<ByteBuffer> compressedByteBuffersWriteObjectStrategy(
+ public static ObjectStrategy<ByteBuffer>
compressedByteBuffersWriteObjectStrategy(
final CompressionStrategy compressionStrategy,
final int bufferSize,
final Closer closer
diff --git
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
index effb4d3..848645d 100644
---
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
+++
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
@@ -95,7 +94,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
@Nullable
private String typeName = null;
@Nullable
- private GenericColumnSerializer delegate = null;
+ private Serializer delegate = null;
public SerializerBuilder withTypeName(final String typeName)
{
@@ -103,7 +102,7 @@ public class ComplexColumnPartSerde implements
ColumnPartSerde
return this;
}
- public SerializerBuilder withDelegate(final GenericColumnSerializer
delegate)
+ public SerializerBuilder withDelegate(final Serializer delegate)
{
this.delegate = delegate;
return this;
diff --git
a/processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
b/processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
similarity index 90%
rename from
processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
rename to
processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
index c4bf836..e8a8a9b 100644
---
a/processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
@@ -27,9 +27,8 @@ import org.junit.Test;
/**
*/
-public class DictionaryMergeIteratorTest
+public class DictionaryMergingIteratorTest
{
-
@Test
public void basicTest()
{
@@ -45,7 +44,11 @@ public class DictionaryMergeIteratorTest
Indexed<String> i4 = new ListIndexed<String>(s4);
Indexed<String> i5 = new ListIndexed<String>(s5);
- IndexMerger.DictionaryMergeIterator iterator = new
IndexMerger.DictionaryMergeIterator(new Indexed[]{i1, i2, i3, i4, i5}, false);
+ DictionaryMergingIterator<String> iterator = new
DictionaryMergingIterator<>(
+ new Indexed[]{i1, i2, i3, i4, i5},
+ StringDimensionMergerV9.DICTIONARY_MERGING_COMPARATOR,
+ false
+ );
Assert.assertArrayEquals(new String[]{"a", "b", "c", "d", "e", "f"},
Iterators.toArray(iterator, String.class));
diff --git
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 9f3d59a..0929e79 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -39,7 +39,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -71,8 +70,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -2006,31 +2003,6 @@ public class IndexMergerTestBase extends
InitializedNullHandlingTest
}
@Test
- public void testDictIdSeeker()
- {
- IntBuffer dimConversions = ByteBuffer.allocateDirect(3 *
Integer.BYTES).asIntBuffer();
- dimConversions.put(0);
- dimConversions.put(2);
- dimConversions.put(4);
- IndexMerger.IndexSeeker dictIdSeeker = new
IndexMerger.IndexSeekerWithConversion(
- (IntBuffer) dimConversions.asReadOnlyBuffer().rewind()
- );
- Assert.assertEquals(0, dictIdSeeker.seek(0));
- Assert.assertEquals(-1, dictIdSeeker.seek(1));
- Assert.assertEquals(1, dictIdSeeker.seek(2));
- try {
- dictIdSeeker.seek(5);
- Assert.fail("Only support access in order");
- }
- catch (ISE ise) {
- Assert.assertTrue("Only support access in order", true);
- }
- Assert.assertEquals(-1, dictIdSeeker.seek(3));
- Assert.assertEquals(2, dictIdSeeker.seek(4));
- Assert.assertEquals(-1, dictIdSeeker.seek(5));
- }
-
- @Test
public void testMultiValueHandling() throws Exception
{
InputRow[] rows = new InputRow[]{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]