This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d616a097e5 Add BigDecimal OnHeap/OffHeap mutable dictionaries (#8678)
d616a097e5 is described below
commit d616a097e5b28443b42a9e84113d3fced254444d
Author: nizarhejazi <[email protected]>
AuthorDate: Tue May 10 17:58:25 2022 -0700
Add BigDecimal OnHeap/OffHeap mutable dictionaries (#8678)
---
.../BigDecimalOffHeapMutableDictionary.java | 257 +++++++++++++++++++++
.../BigDecimalOnHeapMutableDictionary.java | 219 ++++++++++++++++++
.../impl/dictionary/MutableDictionaryFactory.java | 5 +
.../impl/dictionary/MutableDictionaryTest.java | 15 +-
4 files changed, 492 insertions(+), 4 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
new file mode 100644
index 0000000000..9350de77d4
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOffHeapMutableDictionary.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.dictionary;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntSets;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.predicate.RangePredicate;
+import
org.apache.pinot.segment.local.io.writer.impl.MutableOffHeapByteArrayStore;
+import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+
+
+@SuppressWarnings("Duplicates")
+public class BigDecimalOffHeapMutableDictionary extends
BaseOffHeapMutableDictionary {
+ private final MutableOffHeapByteArrayStore _byteStore;
+
+ private volatile BigDecimal _min = null;
+ private volatile BigDecimal _max = null;
+
+ public BigDecimalOffHeapMutableDictionary(int estimatedCardinality, int
maxOverflowHashSize,
+ PinotDataBufferMemoryManager memoryManager, String allocationContext,
int avgBigDecimalLen) {
+ super(estimatedCardinality, maxOverflowHashSize, memoryManager,
allocationContext);
+ _byteStore =
+ new MutableOffHeapByteArrayStore(memoryManager, allocationContext,
estimatedCardinality, avgBigDecimalLen);
+ }
+
+ @Override
+ public int index(Object value) {
+ BigDecimal bigDecimalValue = (BigDecimal) value;
+ updateMinMax(bigDecimalValue);
+ return indexValue(bigDecimalValue,
BigDecimalUtils.serialize(bigDecimalValue));
+ }
+
+ @Override
+ public int[] index(Object[] values) {
+ int numValues = values.length;
+ int[] dictIds = new int[numValues];
+ for (int i = 0; i < numValues; i++) {
+ BigDecimal bigDecimalValue = (BigDecimal) values[i];
+ updateMinMax(bigDecimalValue);
+ dictIds[i] = indexValue(bigDecimalValue,
BigDecimalUtils.serialize(bigDecimalValue));
+ }
+ return dictIds;
+ }
+
+ @Override
+ public int compare(int dictId1, int dictId2) {
+ return getBigDecimalValue(dictId1).compareTo(getBigDecimalValue(dictId2));
+ }
+
+ @Override
+ public IntSet getDictIdsInRange(String lower, String upper, boolean
includeLower, boolean includeUpper) {
+ int numValues = length();
+ if (numValues == 0) {
+ return IntSets.EMPTY_SET;
+ }
+ IntSet dictIds = new IntOpenHashSet();
+
+ if (lower.equals(RangePredicate.UNBOUNDED)) {
+ BigDecimal upperValue = new BigDecimal(upper);
+ if (includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(upperValue) <= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(upperValue) < 0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ } else if (upper.equals(RangePredicate.UNBOUNDED)) {
+ BigDecimal lowerValue = new BigDecimal(lower);
+ if (includeLower) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ } else {
+ BigDecimal lowerValue = new BigDecimal(lower);
+ BigDecimal upperValue = new BigDecimal(upper);
+ if (includeLower && includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0 && value.compareTo(upperValue)
<= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else if (includeLower) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0 && value.compareTo(upperValue)
< 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else if (includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0 && value.compareTo(upperValue)
<= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0 && value.compareTo(upperValue) <
0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ }
+ return dictIds;
+ }
+
+ @Override
+ public BigDecimal getMinVal() {
+ return _min;
+ }
+
+ @Override
+ public BigDecimal getMaxVal() {
+ return _max;
+ }
+
+ @Override
+ public BigDecimal[] getSortedValues() {
+ int numValues = length();
+ BigDecimal[] sortedValues = new BigDecimal[numValues];
+
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ sortedValues[dictId] = getBigDecimalValue(dictId);
+ }
+
+ Arrays.sort(sortedValues);
+ return sortedValues;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return DataType.BIG_DECIMAL;
+ }
+
+ @Override
+ public int indexOf(String stringValue) {
+ BigDecimal bigDecimalValue = new BigDecimal(stringValue);
+ return getDictId(bigDecimalValue,
BigDecimalUtils.serialize(bigDecimalValue));
+ }
+
+ public BigDecimal get(int dictId) {
+ return getBigDecimalValue(dictId);
+ }
+
+ @Override
+ public int getIntValue(int dictId) {
+ return getBigDecimalValue(dictId).intValue();
+ }
+
+ @Override
+ public long getLongValue(int dictId) {
+ return getBigDecimalValue(dictId).longValue();
+ }
+
+ @Override
+ public float getFloatValue(int dictId) {
+ return getBigDecimalValue(dictId).floatValue();
+ }
+
+ @Override
+ public double getDoubleValue(int dictId) {
+ return getBigDecimalValue(dictId).doubleValue();
+ }
+
+ @Override
+ public BigDecimal getBigDecimalValue(int dictId) {
+ return BigDecimalUtils.deserialize(_byteStore.get(dictId));
+ }
+
+ @Override
+ public String getStringValue(int dictId) {
+ return getBigDecimalValue(dictId).toPlainString();
+ }
+
+ @Override
+ protected void setValue(int dictId, Object value, byte[] serializedValue) {
+ _byteStore.add(serializedValue);
+ }
+
+ @Override
+ protected boolean equalsValueAt(int dictId, Object value, byte[]
serializedValue) {
+ return _byteStore.equalsValueAt(serializedValue, dictId);
+ }
+
+ @Override
+ public int getAvgValueSize() {
+ return (int) _byteStore.getAvgValueSize();
+ }
+
+ @Override
+ public long getTotalOffHeapMemUsed() {
+ return getOffHeapMemUsed() + _byteStore.getTotalOffHeapMemUsed();
+ }
+
+ @Override
+ public void doClose()
+ throws IOException {
+ _byteStore.close();
+ }
+
+ private void updateMinMax(BigDecimal value) {
+ if (_min == null) {
+ _min = value;
+ _max = value;
+ } else {
+ if (value.compareTo(_min) < 0) {
+ _min = value;
+ }
+ if (value.compareTo(_max) > 0) {
+ _max = value;
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
new file mode 100644
index 0000000000..0631bdf000
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/BigDecimalOnHeapMutableDictionary.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.dictionary;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntSets;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.pinot.common.request.context.predicate.RangePredicate;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.BytesUtils;
+
+
+@SuppressWarnings("Duplicates")
+public class BigDecimalOnHeapMutableDictionary extends
BaseOnHeapMutableDictionary {
+ private volatile BigDecimal _min = null;
+ private volatile BigDecimal _max = null;
+
+ @Override
+ public int index(Object value) {
+ BigDecimal bigDecimalValue = (BigDecimal) value;
+ updateMinMax(bigDecimalValue);
+ return indexValue(bigDecimalValue);
+ }
+
+ @Override
+ public int[] index(Object[] values) {
+ int numValues = values.length;
+ int[] dictIds = new int[numValues];
+ for (int i = 0; i < numValues; i++) {
+ BigDecimal bigDecimalValue = (BigDecimal) values[i];
+ updateMinMax(bigDecimalValue);
+ dictIds[i] = indexValue(bigDecimalValue);
+ }
+ return dictIds;
+ }
+
+ @Override
+ public int compare(int dictId1, int dictId2) {
+ return getBigDecimalValue(dictId1).compareTo(getBigDecimalValue(dictId2));
+ }
+
+ @Override
+ public IntSet getDictIdsInRange(String lower, String upper, boolean
includeLower, boolean includeUpper) {
+ int numValues = length();
+ if (numValues == 0) {
+ return IntSets.EMPTY_SET;
+ }
+ IntSet dictIds = new IntOpenHashSet();
+
+ if (lower.equals(RangePredicate.UNBOUNDED)) {
+ BigDecimal upperValue = new BigDecimal(upper);
+ if (includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(upperValue) <= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(upperValue) < 0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ } else if (upper.equals(RangePredicate.UNBOUNDED)) {
+ BigDecimal lowerValue = new BigDecimal(lower);
+ if (includeLower) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ } else {
+ BigDecimal lowerValue = new BigDecimal(lower);
+ BigDecimal upperValue = new BigDecimal(upper);
+ if (includeLower && includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0 && value.compareTo(upperValue)
<= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else if (includeLower) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) >= 0 && value.compareTo(upperValue)
< 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else if (includeUpper) {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0 && value.compareTo(upperValue)
<= 0) {
+ dictIds.add(dictId);
+ }
+ }
+ } else {
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ BigDecimal value = getBigDecimalValue(dictId);
+ if (value.compareTo(lowerValue) > 0 && value.compareTo(upperValue) <
0) {
+ dictIds.add(dictId);
+ }
+ }
+ }
+ }
+ return dictIds;
+ }
+
+ @Override
+ public BigDecimal getMinVal() {
+ return _min;
+ }
+
+ @Override
+ public BigDecimal getMaxVal() {
+ return _max;
+ }
+
+ @Override
+ public BigDecimal[] getSortedValues() {
+ int numValues = length();
+ BigDecimal[] sortedValues = new BigDecimal[numValues];
+
+ for (int dictId = 0; dictId < numValues; dictId++) {
+ sortedValues[dictId] = getBigDecimalValue(dictId);
+ }
+
+ Arrays.sort(sortedValues);
+ return sortedValues;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return DataType.BIG_DECIMAL;
+ }
+
+ @Override
+ public int indexOf(String stringValue) {
+ return getDictId(new BigDecimal(stringValue));
+ }
+
+ @Override
+ public int getIntValue(int dictId) {
+ return getBigDecimalValue(dictId).intValue();
+ }
+
+ @Override
+ public long getLongValue(int dictId) {
+ return getBigDecimalValue(dictId).longValue();
+ }
+
+ @Override
+ public float getFloatValue(int dictId) {
+ return getBigDecimalValue(dictId).floatValue();
+ }
+
+ @Override
+ public double getDoubleValue(int dictId) {
+ return getBigDecimalValue(dictId).doubleValue();
+ }
+
+ @Override
+ public BigDecimal getBigDecimalValue(int dictId) {
+ return (BigDecimal) get(dictId);
+ }
+
+ @Override
+ public String getStringValue(int dictId) {
+ return getBigDecimalValue(dictId).toPlainString();
+ }
+
+ @Override
+ public byte[] getBytesValue(int dictId) {
+ return BytesUtils.toBytes(getStringValue(dictId));
+ }
+
+ private void updateMinMax(BigDecimal value) {
+ if (_min == null) {
+ _min = value;
+ _max = value;
+ } else {
+ if (value.compareTo(_min) < 0) {
+ _min = value;
+ }
+ if (value.compareTo(_max) > 0) {
+ _max = value;
+ }
+ }
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryFactory.java
index 719ab0b8e5..bcac63134c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryFactory.java
@@ -41,6 +41,9 @@ public class MutableDictionaryFactory {
return new FloatOffHeapMutableDictionary(cardinality,
maxOverflowSize, memoryManager, allocationContext);
case DOUBLE:
return new DoubleOffHeapMutableDictionary(cardinality,
maxOverflowSize, memoryManager, allocationContext);
+ case BIG_DECIMAL:
+ return new BigDecimalOffHeapMutableDictionary(cardinality,
maxOverflowSize, memoryManager, allocationContext,
+ avgLength);
case STRING:
return new StringOffHeapMutableDictionary(cardinality,
maxOverflowSize, memoryManager, allocationContext,
avgLength);
@@ -61,6 +64,8 @@ public class MutableDictionaryFactory {
return new FloatOnHeapMutableDictionary();
case DOUBLE:
return new DoubleOnHeapMutableDictionary();
+ case BIG_DECIMAL:
+ return new BigDecimalOnHeapMutableDictionary();
case STRING:
return new StringOnHeapMutableDictionary();
case BYTES:
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryTest.java
index c1882401e2..9f717689b1 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MutableDictionaryTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.realtime.impl.dictionary;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -51,7 +52,7 @@ public class MutableDictionaryTest {
private static final int NUM_READERS = 3;
private static final FieldSpec.DataType[] DATA_TYPES = {
FieldSpec.DataType.INT, FieldSpec.DataType.LONG,
FieldSpec.DataType.FLOAT, FieldSpec.DataType.DOUBLE,
- FieldSpec.DataType.STRING, FieldSpec.DataType.BYTES
+ FieldSpec.DataType.BIG_DECIMAL, FieldSpec.DataType.STRING,
FieldSpec.DataType.BYTES
};
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
@@ -194,9 +195,10 @@ public class MutableDictionaryTest {
// Test sorted values.
Collections.sort(expectedSortedValues);
Object sortedValues = dictionary.getSortedValues();
- List<Comparable> actualSortedValues =
- (dataType.equals(FieldSpec.DataType.STRING) ||
dataType.equals(FieldSpec.DataType.BYTES)) ? Arrays
- .asList((Comparable[]) dictionary.getSortedValues()) :
primitiveArrayToList(dataType, sortedValues);
+ List<Comparable> actualSortedValues =
(dataType.equals(FieldSpec.DataType.STRING)
+ || dataType.equals(FieldSpec.DataType.BYTES) ||
dataType.equals(FieldSpec.DataType.BIG_DECIMAL))
+ ? Arrays.asList((Comparable[]) dictionary.getSortedValues())
+ : primitiveArrayToList(dataType, sortedValues);
Assert.assertEquals(actualSortedValues, expectedSortedValues);
Assert.assertEquals(dictionary.getDictIdsInRange(expectedMin.toString(),
expectedMax.toString(), true, true).size(),
@@ -214,6 +216,9 @@ public class MutableDictionaryTest {
return new FloatOffHeapMutableDictionary(estCardinality,
maxOverflowSize, _memoryManager, "floatColumn");
case DOUBLE:
return new DoubleOffHeapMutableDictionary(estCardinality,
maxOverflowSize, _memoryManager, "doubleColumn");
+ case BIG_DECIMAL:
+ return new BigDecimalOffHeapMutableDictionary(estCardinality,
maxOverflowSize, _memoryManager,
+ "bigDecimalColumn", 32);
case STRING:
return new StringOffHeapMutableDictionary(estCardinality,
maxOverflowSize, _memoryManager, "stringColumn", 32);
case BYTES:
@@ -233,6 +238,8 @@ public class MutableDictionaryTest {
return RANDOM.nextFloat();
case DOUBLE:
return RANDOM.nextDouble();
+ case BIG_DECIMAL:
+ return BigDecimal.valueOf(RANDOM.nextDouble());
case STRING:
return RandomStringUtils.randomAscii(RANDOM.nextInt(1024));
case BYTES:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]