This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e81e530 Support STRING and BYTES for no dictionary columns in
realtime consuming segments (#4791)
e81e530 is described below
commit e81e5303a2e6157bb3530c621c3b8ce748788a83
Author: Sidd <[email protected]>
AuthorDate: Sun Jan 12 11:29:44 2020 -0800
Support STRING and BYTES for no dictionary columns in realtime consuming
segments (#4791)
---
.../indexsegment/mutable/MutableSegmentImpl.java | 92 ++++++++++++----
.../BaseSingleColumnSingleValueReaderWriter.java | 10 +-
...xedByteSingleColumnSingleValueReaderWriter.java | 10 ++
...VarByteSingleColumnSingleValueReaderWriter.java | 120 +++++++++++++++++++++
.../stats/RealtimeNoDictionaryColStatistics.java | 23 ++--
.../index/data/source/ColumnDataSource.java | 4 +
.../core/common/RealtimeNoDictionaryTest.java | 49 +++++++++
...yteSingleColumnSingleValueReaderWriterTest.java | 96 +++++++++++++++++
.../java/org/apache/pinot/spi/data/FieldSpec.java | 8 ++
9 files changed, 375 insertions(+), 37 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 416f9e1..dd451cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -37,6 +37,7 @@ import org.apache.pinot.core.io.reader.DataFileReader;
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnMultiValueReaderWriter;
import
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
+import
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
@@ -76,6 +77,9 @@ public class MutableSegmentImpl implements MutableSegment {
private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of
recordIdMap for updatable metrics.
private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min
overflow map size for updatable metrics.
+ private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
+ private static final int
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
+
private final Logger _logger;
private final long _startTimeMillis = System.currentTimeMillis();
@@ -183,15 +187,22 @@ public class MutableSegmentImpl implements MutableSegment
{
_maxNumValuesMap.put(column, 0);
// Check whether to generate raw index for the column while consuming
- // Only support generating raw index on single-value non-string columns
that do not have inverted index while
+ // Only support generating raw index on single-value columns that do not
have inverted index while
// consuming. After consumption completes and the segment is built, all
single-value columns can have raw index
FieldSpec.DataType dataType = fieldSpec.getDataType();
- int indexColumnSize = FieldSpec.DataType.INT.size();
- if (noDictionaryColumns.contains(column) &&
fieldSpec.isSingleValueField()
- && dataType != FieldSpec.DataType.STRING &&
!invertedIndexColumns.contains(column)) {
- // No dictionary
- indexColumnSize = dataType.size();
+ boolean isFixedWidthColumn = dataType.isFixedWidth();
+ int forwardIndexColumnSize = -1;
+ if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns,
fieldSpec, column)) {
+ // no dictionary
+ // each forward index entry will be equal to size of data for that row
+ // For INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed
bytes used to store the value,
+ if (isFixedWidthColumn) {
+ forwardIndexColumnSize = dataType.size();
+ }
} else {
+ // dictionary encoded index
+ // each forward index entry will contain a 4 byte dictionary ID
+ forwardIndexColumnSize = FieldSpec.DataType.INT.size();
int dictionaryColumnSize;
if (dataType == FieldSpec.DataType.STRING) {
dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
@@ -211,19 +222,41 @@ public class MutableSegmentImpl implements MutableSegment
{
}
DataFileReader indexReaderWriter;
- if (fieldSpec.isSingleValueField()) {
- String allocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
- indexReaderWriter = new
FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize,
_memoryManager,
- allocationContext);
+
+ // create forward index reader/writer
+ if (forwardIndexColumnSize == -1) {
+ // for STRING/BYTES SV column, we support raw index in consuming
segments
+ // RealtimeSegmentStatsHistory does not have the stats for
no-dictionary columns
+ // from previous consuming segments
+ // TODO: Add support for updating RealtimeSegmentStatsHistory with
average column value size for no dictionary columns as well
+ // TODO: Use the stats to get estimated average length
+ // Use a smaller capacity as opposed to segment flush size
+ int initialCapacity = Math.min(_capacity,
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+ String allocationContext = buildAllocationContext(_segmentName,
column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+ indexReaderWriter = new
VarByteSingleColumnSingleValueReaderWriter(_memoryManager, allocationContext,
initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
} else {
- // TODO: Start with a smaller capacity on
FixedByteSingleColumnMultiValueReaderWriter and let it expand
- String allocationContext =
- buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
- indexReaderWriter =
- new
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW,
avgNumMultiValues, _capacity,
- indexColumnSize, _memoryManager, allocationContext);
+ // two possible cases can lead here:
+ // (1) dictionary encoded forward index
+ // (2) raw forward index for fixed width types -- INT, LONG, FLOAT,
DOUBLE
+ if (fieldSpec.isSingleValueField()) {
+ // SV column -- both dictionary encoded and raw index are supported
on SV
+ // columns for both fixed and variable width types
+ String allocationContext =
+ buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+ indexReaderWriter = new
FixedByteSingleColumnSingleValueReaderWriter(_capacity, forwardIndexColumnSize,
_memoryManager,
+ allocationContext);
+ } else {
+ // MV column -- only dictionary encoded index is supported on MV
columns
+ // for both fixed and variable width types
+ // TODO: Start with a smaller capacity on
FixedByteSingleColumnMultiValueReaderWriter and let it expand
+ String allocationContext =
+ buildAllocationContext(_segmentName, column,
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+ indexReaderWriter =
+ new
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW,
avgNumMultiValues, _capacity,
+ forwardIndexColumnSize, _memoryManager, allocationContext);
+ }
}
+
_indexReaderWriterMap.put(column, indexReaderWriter);
if (invertedIndexColumns.contains(column)) {
@@ -240,6 +273,20 @@ public class MutableSegmentImpl implements MutableSegment {
_recordIdMap = enableMetricsAggregationIfPossible(config,
noDictionaryColumns);
}
+ /**
+ * Decide whether a given column should be dictionary encoded or not
+ * @param noDictionaryColumns no dictionary column set
+ * @param invertedIndexColumns inverted index column set
+ * @param fieldSpec field spec of column
+ * @param column column name
+ * @return true if column is no-dictionary, false if dictionary encoded
+ */
+ private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns,
Set<String> invertedIndexColumns,
+ FieldSpec fieldSpec, String column) {
+ return noDictionaryColumns.contains(column) &&
fieldSpec.isSingleValueField()
+ && !invertedIndexColumns.contains(column);
+ }
+
public SegmentPartitionConfig getSegmentPartitionConfig() {
return _segmentPartitionConfig;
}
@@ -336,14 +383,15 @@ public class MutableSegmentImpl implements MutableSegment
{
String column = fieldSpec.getName();
Object value = row.getValue(column);
if (fieldSpec.isSingleValueField()) {
+ // SV column
FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
(FixedByteSingleColumnSingleValueReaderWriter)
_indexReaderWriterMap.get(column);
Integer dictId = (Integer) dictIdMap.get(column);
if (dictId != null) {
- // Column with dictionary
+ // SV column with dictionary
indexReaderWriter.setInt(docId, dictId);
} else {
- // No-dictionary column
+ // No-dictionary SV column
FieldSpec.DataType dataType = fieldSpec.getDataType();
switch (dataType) {
case INT:
@@ -358,6 +406,12 @@ public class MutableSegmentImpl implements MutableSegment {
case DOUBLE:
indexReaderWriter.setDouble(docId, (Double) value);
break;
+ case STRING:
+ indexReaderWriter.setString(docId, (String) value);
+ break;
+ case BYTES:
+ indexReaderWriter.setBytes(docId, (byte[]) value);
+ break;
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for no-dictionary
column: " + column);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
index 73ea205..7dc8758 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
@@ -92,7 +92,7 @@ public abstract class
BaseSingleColumnSingleValueReaderWriter<T extends ReaderCo
@Override
public byte[] getBytes(int row, ReaderContext context) {
- throw new UnsupportedOperationException();
+ return getBytes(row);
}
@Override
@@ -144,4 +144,12 @@ public abstract class
BaseSingleColumnSingleValueReaderWriter<T extends ReaderCo
public void setBytes(int row, byte[] bytes) {
throw new UnsupportedOperationException();
}
+
+ public int getLengthOfShortestElement() {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getLengthOfLongestElement() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
index 3e3dde4..e3130e5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
@@ -72,6 +72,16 @@ public class FixedByteSingleColumnSingleValueReaderWriter
extends BaseSingleColu
}
@Override
+ public int getLengthOfShortestElement() {
+ return _columnSizesInBytes;
+ }
+
+ @Override
+ public int getLengthOfLongestElement() {
+ return _columnSizesInBytes;
+ }
+
+ @Override
public void close()
throws IOException {
for (ReaderWithOffset reader : _readers) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
new file mode 100644
index 0000000..0327b12
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.readerwriter.impl;
+
+import java.io.IOException;
+import org.apache.pinot.common.utils.StringUtil;
+import
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;
+
+public class VarByteSingleColumnSingleValueReaderWriter extends
BaseSingleColumnSingleValueReaderWriter {
+ private final MutableOffHeapByteArrayStore _byteArrayStore;
+ private int _lengthOfShortestElement;
+ private int _lengthOfLongestElement;
+
+ public VarByteSingleColumnSingleValueReaderWriter(
+ PinotDataBufferMemoryManager memoryManager,
+ String allocationContext,
+ int estimatedMaxNumberOfValues,
+ int estimatedAverageStringLength) {
+ _byteArrayStore = new MutableOffHeapByteArrayStore(memoryManager,
allocationContext, estimatedMaxNumberOfValues, estimatedAverageStringLength);
+ _lengthOfShortestElement = Integer.MAX_VALUE;
+ _lengthOfLongestElement = Integer.MIN_VALUE;
+ }
+
+ @Override
+ public int getLengthOfShortestElement() {
+ return _lengthOfShortestElement;
+ }
+
+ @Override
+ public int getLengthOfLongestElement() {
+ return _lengthOfLongestElement;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _byteArrayStore.close();
+ }
+
+ @Override
+ public void setInt(int row, int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getInt(int row) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setLong(int row, long l) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLong(int row) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFloat(int row, float f) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloat(int row) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDouble(int row, double d) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double getDouble(int row) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setString(int row, String val) {
+ byte[] serializedValue = StringUtil.encodeUtf8(val);
+ setBytes(row, serializedValue);
+ }
+
+ @Override
+ public String getString(int row) {
+ return StringUtil.decodeUtf8(_byteArrayStore.get(row));
+ }
+
+ @Override
+ public void setBytes(int row, byte[] value) {
+ _byteArrayStore.add(value);
+ _lengthOfLongestElement = Math.max(_lengthOfLongestElement, value.length);
+ _lengthOfShortestElement = Math.min(_lengthOfShortestElement,
value.length);
+ }
+
+ @Override
+ public byte[] getBytes(int row) {
+ return _byteArrayStore.get(row);
+ }
+}
\ No newline at end of file
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
index ea85788..293311c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.partition.PartitionFunction;
+import
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.segment.creator.ColumnStatistics;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
@@ -30,12 +31,15 @@ import static
org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
+ final BaseSingleColumnSingleValueReaderWriter _forwardIndex;
final BlockValSet _blockValSet;
final int _numDocIds;
final String _operatorName;
public RealtimeNoDictionaryColStatistics(ColumnDataSource dataSource) {
_operatorName = dataSource.getOperatorName();
+ // no-dictionary is only supported for SV columns
+ _forwardIndex =
(BaseSingleColumnSingleValueReaderWriter)dataSource.getForwardIndex();
Block block = dataSource.nextBlock();
_numDocIds = block.getMetadata().getEndDocId() + 1;
_blockValSet = block.getBlockValueSet();
@@ -63,12 +67,12 @@ public class RealtimeNoDictionaryColStatistics implements
ColumnStatistics {
@Override
public int getLengthOfShortestElement() {
- return lengthOfDataType(); // Only fixed length data types supported.
+ return _forwardIndex.getLengthOfShortestElement();
}
@Override
public int getLengthOfLargestElement() {
- return lengthOfDataType(); // Only fixed length data types supported.
+ return _forwardIndex.getLengthOfLongestElement();
}
@Override
@@ -105,19 +109,4 @@ public class RealtimeNoDictionaryColStatistics implements
ColumnStatistics {
public Set<Integer> getPartitions() {
return null;
}
-
- private int lengthOfDataType() {
- switch (_blockValSet.getValueType()) {
- case INT:
- return Integer.BYTES;
- case LONG:
- return Long.BYTES;
- case FLOAT:
- return Float.BYTES;
- case DOUBLE:
- return Double.BYTES;
- default:
- throw new UnsupportedOperationException();
- }
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
index c75a69e..7fdf7a7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -190,4 +190,8 @@ public final class ColumnDataSource extends DataSource {
public String getOperatorName() {
return _operatorName;
}
+
+ public DataFileReader getForwardIndex() {
+ return _forwardIndex;
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 3e1e997..ce6eb4f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -21,10 +21,14 @@ package org.apache.pinot.core.common;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
+import
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
import org.testng.Assert;
@@ -38,11 +42,15 @@ public class RealtimeNoDictionaryTest {
private static final String LONG_COL_NAME = "longcol";
private static final String FLOAT_COL_NAME = "floatcol";
private static final String DOUBLE_COL_NAME = "doublecol";
+ private static final String STRING_COL_NAME = "stringcol";
+ private static final String BYTES_COL_NAME = "bytescol";
private static final int NUM_ROWS = 1000;
private int[] _intVals = new int[NUM_ROWS];
private long[] _longVals = new long[NUM_ROWS];
private float[] _floatVals = new float[NUM_ROWS];
private double[] _doubleVals = new double[NUM_ROWS];
+ private String[] _stringVals = new String[NUM_ROWS];
+
private Random _random;
private PinotDataBufferMemoryManager _memoryManager;
@@ -63,6 +71,8 @@ public class RealtimeNoDictionaryTest {
FieldSpec longSpec = new MetricFieldSpec(LONG_COL_NAME,
FieldSpec.DataType.LONG);
FieldSpec floatSpec = new MetricFieldSpec(FLOAT_COL_NAME,
FieldSpec.DataType.FLOAT);
FieldSpec doubleSpec = new MetricFieldSpec(DOUBLE_COL_NAME,
FieldSpec.DataType.DOUBLE);
+ FieldSpec stringSpec = new DimensionFieldSpec(STRING_COL_NAME,
FieldSpec.DataType.STRING, true);
+ FieldSpec bytesSpec = new DimensionFieldSpec(BYTES_COL_NAME,
FieldSpec.DataType.BYTES, true);
_random = new Random(seed);
FixedByteSingleColumnSingleValueReaderWriter intRawIndex =
@@ -77,6 +87,10 @@ public class RealtimeNoDictionaryTest {
FixedByteSingleColumnSingleValueReaderWriter doubleRawIndex =
new
FixedByteSingleColumnSingleValueReaderWriter(_random.nextInt(NUM_ROWS) + 1,
Double.BYTES, _memoryManager,
"double");
+ VarByteSingleColumnSingleValueReaderWriter stringRawIndex =
+ new VarByteSingleColumnSingleValueReaderWriter(_memoryManager,
"StringColumn", 512, 30);
+ VarByteSingleColumnSingleValueReaderWriter bytesRawIndex =
+ new VarByteSingleColumnSingleValueReaderWriter(_memoryManager,
"BytesColumn", 512, 30);
for (int i = 0; i < NUM_ROWS; i++) {
_intVals[i] = _random.nextInt();
@@ -87,6 +101,11 @@ public class RealtimeNoDictionaryTest {
floatRawIndex.setFloat(i, _floatVals[i]);
_doubleVals[i] = _random.nextDouble();
doubleRawIndex.setDouble(i, _doubleVals[i]);
+ // generate a random string of length between 10 and 100
+ int length = 10 + _random.nextInt(100 - 10);
+ _stringVals[i] = RandomStringUtils.randomAlphanumeric(length);
+ stringRawIndex.setString(i, _stringVals[i]);
+ bytesRawIndex.setBytes(i, StringUtil.encodeUtf8(_stringVals[i]));
}
Map<String, DataSource> dataSourceBlock = new HashMap<>();
@@ -95,11 +114,41 @@ public class RealtimeNoDictionaryTest {
dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec,
NUM_ROWS, 0, floatRawIndex, null, null, null, null));
dataSourceBlock
.put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0,
doubleRawIndex, null, null, null, null));
+ dataSourceBlock
+ .put(STRING_COL_NAME, new ColumnDataSource(stringSpec, NUM_ROWS, 0,
stringRawIndex, null, null, null, null));
+ dataSourceBlock
+ .put(BYTES_COL_NAME, new ColumnDataSource(bytesSpec, NUM_ROWS, 0,
bytesRawIndex, null, null, null, null));
return new DataFetcher(dataSourceBlock);
}
@Test
+ public void testStringAndBytes() {
+ long seed = new Random().nextLong();
+ DataFetcher dataFetcher = makeDataFetcher(seed);
+ int[] docIds = new int[NUM_ROWS];
+ int numDocIds = _random.nextInt(NUM_ROWS) + 1;
+ for (int i = 0; i < numDocIds; i++) {
+ docIds[i] = _random.nextInt(NUM_ROWS);
+ }
+ try {
+ String[] stringValues = new String[NUM_ROWS];
+ dataFetcher.fetchStringValues(STRING_COL_NAME, docIds, numDocIds,
stringValues);
+ for (int i = 0; i < numDocIds; i++) {
+ Assert.assertEquals(stringValues[i], _stringVals[docIds[i]], " for row
" + docIds[i]);
+ }
+ byte[][] byteValues = new byte[NUM_ROWS][];
+ dataFetcher.fetchBytesValues(BYTES_COL_NAME, docIds, numDocIds,
byteValues);
+ for (int i = 0; i < numDocIds; i++) {
+ Assert.assertEquals(StringUtil.decodeUtf8(byteValues[i]),
_stringVals[docIds[i]], " for row " + docIds[i]);
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Failed with seed " + seed);
+ }
+ }
+
+ @Test
public void testIntColumn() {
long seed = new Random().nextLong();
DataFetcher dataFetcher = makeDataFetcher(seed);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
new file mode 100644
index 0000000..9458243
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.index.readerwriter;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class VarByteSingleColumnSingleValueReaderWriterTest {
+ private PinotDataBufferMemoryManager _memoryManager;
+
+ @BeforeClass
+ public void setUp() {
+ _memoryManager = new
DirectMemoryManager(VarByteSingleColumnSingleValueReaderWriterTest.class.getName());
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ _memoryManager.close();
+ }
+
+ @Test
+ public void testString()
+ throws IOException {
+ // use arbitrary cardinality and avg string length
+ // we will test with complete randomness
+ int initialCapacity = 5;
+ int estimatedAvgStringLength = 30;
+ try (VarByteSingleColumnSingleValueReaderWriter readerWriter =
+ new VarByteSingleColumnSingleValueReaderWriter(_memoryManager,
"StringColumn", initialCapacity, estimatedAvgStringLength)) {
+ int rows = 1000;
+ Random random = new Random();
+ String[] data = new String[rows];
+
+ for (int i = 0; i < rows; i++) {
+ // generate a random string of length between 10 and 100
+ int length = 10 + random.nextInt(100 - 10);
+ data[i] = RandomStringUtils.randomAlphanumeric(length);
+ readerWriter.setString(i, data[i]);
+ }
+
+ for (int i = 0; i < rows; i++) {
+ Assert.assertEquals(readerWriter.getString(i), data[i]);
+ }
+ }
+ }
+
+ @Test
+ public void testBytes()
+ throws IOException {
+ int initialCapacity = 5;
+ int estimatedAvgStringLength = 30;
+ try (VarByteSingleColumnSingleValueReaderWriter readerWriter =
+ new VarByteSingleColumnSingleValueReaderWriter(_memoryManager,
"StringColumn", initialCapacity, estimatedAvgStringLength)) {
+ int rows = 1000;
+ Random random = new Random();
+ String[] data = new String[rows];
+
+ for (int i = 0; i < rows; i++) {
+ int length = 10 + random.nextInt(100 - 10);
+ data[i] = RandomStringUtils.randomAlphanumeric(length);
+ readerWriter.setBytes(i, StringUtil.encodeUtf8(data[i]));
+ }
+
+ for (int i = 0; i < rows; i++) {
+ Assert.assertEquals(StringUtil.decodeUtf8(readerWriter.getBytes(i)),
data[i]);
+ }
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index c9ee5ca..a8b1bc3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -368,6 +368,14 @@ public abstract class FieldSpec implements
Comparable<FieldSpec> {
throw new UnsupportedOperationException("Unsupported data type: " +
this);
}
}
+
+ /**
+ * Check if the data type is for fixed width data (INT, LONG, FLOAT,
DOUBLE)
+ * or variable width data (STRING, BYTES)
+ */
+ public boolean isFixedWidth() {
+ return this != STRING && this != BYTES;
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]