This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e81e530  Support STRING and BYTES for no dictionary columns in 
realtime consuming segments (#4791)
e81e530 is described below

commit e81e5303a2e6157bb3530c621c3b8ce748788a83
Author: Sidd <[email protected]>
AuthorDate: Sun Jan 12 11:29:44 2020 -0800

    Support STRING and BYTES for no dictionary columns in realtime consuming 
segments (#4791)
---
 .../indexsegment/mutable/MutableSegmentImpl.java   |  92 ++++++++++++----
 .../BaseSingleColumnSingleValueReaderWriter.java   |  10 +-
 ...xedByteSingleColumnSingleValueReaderWriter.java |  10 ++
 ...VarByteSingleColumnSingleValueReaderWriter.java | 120 +++++++++++++++++++++
 .../stats/RealtimeNoDictionaryColStatistics.java   |  23 ++--
 .../index/data/source/ColumnDataSource.java        |   4 +
 .../core/common/RealtimeNoDictionaryTest.java      |  49 +++++++++
 ...yteSingleColumnSingleValueReaderWriterTest.java |  96 +++++++++++++++++
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |   8 ++
 9 files changed, 375 insertions(+), 37 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 416f9e1..dd451cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -37,6 +37,7 @@ import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
 import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnMultiValueReaderWriter;
 import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
+import 
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.realtime.impl.dictionary.BaseMutableDictionary;
@@ -76,6 +77,9 @@ public class MutableSegmentImpl implements MutableSegment {
   private static final int MIN_ROWS_TO_INDEX = 1000_000; // Min size of 
recordIdMap for updatable metrics.
   private static final int MIN_RECORD_ID_MAP_CACHE_SIZE = 10000; // Min 
overflow map size for updatable metrics.
 
+  private static final int 
NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT = 100;
+  private static final int 
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT = 100_000;
+
   private final Logger _logger;
   private final long _startTimeMillis = System.currentTimeMillis();
 
@@ -183,15 +187,22 @@ public class MutableSegmentImpl implements MutableSegment 
{
       _maxNumValuesMap.put(column, 0);
 
       // Check whether to generate raw index for the column while consuming
-      // Only support generating raw index on single-value non-string columns 
that do not have inverted index while
+      // Only support generating raw index on single-value columns that do not 
have inverted index while
       // consuming. After consumption completes and the segment is built, all 
single-value columns can have raw index
       FieldSpec.DataType dataType = fieldSpec.getDataType();
-      int indexColumnSize = FieldSpec.DataType.INT.size();
-      if (noDictionaryColumns.contains(column) && 
fieldSpec.isSingleValueField()
-          && dataType != FieldSpec.DataType.STRING && 
!invertedIndexColumns.contains(column)) {
-        // No dictionary
-        indexColumnSize = dataType.size();
+      boolean isFixedWidthColumn = dataType.isFixedWidth();
+      int forwardIndexColumnSize = -1;
+      if (isNoDictionaryColumn(noDictionaryColumns, invertedIndexColumns, 
fieldSpec, column)) {
+        // no dictionary
+        // each forward index entry will be equal to size of data for that row
+        // For INT, LONG, FLOAT, DOUBLE it is equal to the number of fixed 
bytes used to store the value,
+        if (isFixedWidthColumn) {
+          forwardIndexColumnSize = dataType.size();
+        }
       } else {
+        // dictionary encoded index
+        // each forward index entry will contain a 4 byte dictionary ID
+        forwardIndexColumnSize = FieldSpec.DataType.INT.size();
         int dictionaryColumnSize;
         if (dataType == FieldSpec.DataType.STRING) {
           dictionaryColumnSize = _statsHistory.getEstimatedAvgColSize(column);
@@ -211,19 +222,41 @@ public class MutableSegmentImpl implements MutableSegment 
{
       }
 
       DataFileReader indexReaderWriter;
-      if (fieldSpec.isSingleValueField()) {
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
-        indexReaderWriter = new 
FixedByteSingleColumnSingleValueReaderWriter(_capacity, indexColumnSize, 
_memoryManager,
-            allocationContext);
+
+      // create forward index reader/writer
+      if (forwardIndexColumnSize == -1) {
+        // for STRING/BYTES SV column, we support raw index in consuming 
segments
+        // RealtimeSegmentStatsHistory does not have the stats for 
no-dictionary columns
+        // from previous consuming segments
+        // TODO: Add support for updating RealtimeSegmentStatsHistory with 
average column value size for no dictionary columns as well
+        // TODO: Use the stats to get estimated average length
+        // Use a smaller capacity as opposed to segment flush size
+        int initialCapacity = Math.min(_capacity, 
NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+        String allocationContext = buildAllocationContext(_segmentName, 
column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+        indexReaderWriter = new 
VarByteSingleColumnSingleValueReaderWriter(_memoryManager, allocationContext, 
initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
       } else {
-        // TODO: Start with a smaller capacity on 
FixedByteSingleColumnMultiValueReaderWriter and let it expand
-        String allocationContext =
-            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
-        indexReaderWriter =
-            new 
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, 
avgNumMultiValues, _capacity,
-                indexColumnSize, _memoryManager, allocationContext);
+        // two possible cases can lead here:
+        // (1) dictionary encoded forward index
+        // (2) raw forward index for fixed width types -- INT, LONG, FLOAT, 
DOUBLE
+        if (fieldSpec.isSingleValueField()) {
+          // SV column -- both dictionary encoded and raw index are supported 
on SV
+          // columns for both fixed and variable width types
+          String allocationContext =
+              buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter = new 
FixedByteSingleColumnSingleValueReaderWriter(_capacity, forwardIndexColumnSize, 
_memoryManager,
+              allocationContext);
+        } else {
+          // MV column -- only dictionary encoded index is supported on MV 
columns
+          // for both fixed and variable width types
+          // TODO: Start with a smaller capacity on 
FixedByteSingleColumnMultiValueReaderWriter and let it expand
+          String allocationContext =
+              buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+          indexReaderWriter =
+              new 
FixedByteSingleColumnMultiValueReaderWriter(MAX_MULTI_VALUES_PER_ROW, 
avgNumMultiValues, _capacity,
+                  forwardIndexColumnSize, _memoryManager, allocationContext);
+        }
       }
+
       _indexReaderWriterMap.put(column, indexReaderWriter);
 
       if (invertedIndexColumns.contains(column)) {
@@ -240,6 +273,20 @@ public class MutableSegmentImpl implements MutableSegment {
     _recordIdMap = enableMetricsAggregationIfPossible(config, 
noDictionaryColumns);
   }
 
+  /**
+   * Decide whether a given column should be dictionary encoded or not
+   * @param noDictionaryColumns no dictionary column set
+   * @param invertedIndexColumns inverted index column set
+   * @param fieldSpec field spec of column
+   * @param column column name
+   * @return true if column is no-dictionary, false if dictionary encoded
+   */
+  private boolean isNoDictionaryColumn(Set<String> noDictionaryColumns, 
Set<String> invertedIndexColumns,
+      FieldSpec fieldSpec, String column) {
+    return noDictionaryColumns.contains(column) && 
fieldSpec.isSingleValueField()
+        && !invertedIndexColumns.contains(column);
+  }
+
   public SegmentPartitionConfig getSegmentPartitionConfig() {
     return _segmentPartitionConfig;
   }
@@ -336,14 +383,15 @@ public class MutableSegmentImpl implements MutableSegment 
{
       String column = fieldSpec.getName();
       Object value = row.getValue(column);
       if (fieldSpec.isSingleValueField()) {
+        // SV column
         FixedByteSingleColumnSingleValueReaderWriter indexReaderWriter =
             (FixedByteSingleColumnSingleValueReaderWriter) 
_indexReaderWriterMap.get(column);
         Integer dictId = (Integer) dictIdMap.get(column);
         if (dictId != null) {
-          // Column with dictionary
+          // SV column with dictionary
           indexReaderWriter.setInt(docId, dictId);
         } else {
-          // No-dictionary column
+          // No-dictionary SV column
           FieldSpec.DataType dataType = fieldSpec.getDataType();
           switch (dataType) {
             case INT:
@@ -358,6 +406,12 @@ public class MutableSegmentImpl implements MutableSegment {
             case DOUBLE:
               indexReaderWriter.setDouble(docId, (Double) value);
               break;
+            case STRING:
+              indexReaderWriter.setString(docId, (String) value);
+              break;
+            case BYTES:
+              indexReaderWriter.setBytes(docId, (byte[]) value);
+              break;
             default:
               throw new UnsupportedOperationException(
                   "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
index 73ea205..7dc8758 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/BaseSingleColumnSingleValueReaderWriter.java
@@ -92,7 +92,7 @@ public abstract class 
BaseSingleColumnSingleValueReaderWriter<T extends ReaderCo
 
   @Override
   public byte[] getBytes(int row, ReaderContext context) {
-    throw new UnsupportedOperationException();
+    return getBytes(row);
   }
 
   @Override
@@ -144,4 +144,12 @@ public abstract class 
BaseSingleColumnSingleValueReaderWriter<T extends ReaderCo
   public void setBytes(int row, byte[] bytes) {
     throw new UnsupportedOperationException();
   }
+
+  public int getLengthOfShortestElement() {
+    throw new UnsupportedOperationException();
+  }
+
+  public int getLengthOfLongestElement() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
index 3e3dde4..e3130e5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/FixedByteSingleColumnSingleValueReaderWriter.java
@@ -72,6 +72,16 @@ public class FixedByteSingleColumnSingleValueReaderWriter 
extends BaseSingleColu
   }
 
   @Override
+  public int getLengthOfShortestElement() {
+    return _columnSizesInBytes;
+  }
+
+  @Override
+  public int getLengthOfLongestElement() {
+    return _columnSizesInBytes;
+  }
+
+  @Override
   public void close()
       throws IOException {
     for (ReaderWithOffset reader : _readers) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
new file mode 100644
index 0000000..0327b12
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/impl/VarByteSingleColumnSingleValueReaderWriter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.readerwriter.impl;
+
+import java.io.IOException;
+import org.apache.pinot.common.utils.StringUtil;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;
+
+public class VarByteSingleColumnSingleValueReaderWriter extends 
BaseSingleColumnSingleValueReaderWriter {
+  private final MutableOffHeapByteArrayStore _byteArrayStore;
+  private int _lengthOfShortestElement;
+  private int _lengthOfLongestElement;
+
+  public VarByteSingleColumnSingleValueReaderWriter(
+      PinotDataBufferMemoryManager memoryManager,
+      String allocationContext,
+      int estimatedMaxNumberOfValues,
+      int estimatedAverageStringLength) {
+    _byteArrayStore = new MutableOffHeapByteArrayStore(memoryManager, 
allocationContext, estimatedMaxNumberOfValues, estimatedAverageStringLength);
+    _lengthOfShortestElement = Integer.MAX_VALUE;
+    _lengthOfLongestElement = Integer.MIN_VALUE;
+  }
+
+  @Override
+  public int getLengthOfShortestElement() {
+    return _lengthOfShortestElement;
+  }
+
+  @Override
+  public int getLengthOfLongestElement() {
+    return _lengthOfLongestElement;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _byteArrayStore.close();
+  }
+
+  @Override
+  public void setInt(int row, int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getInt(int row) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setLong(int row, long l) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLong(int row) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFloat(int row, float f) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float getFloat(int row) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setDouble(int row, double d) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double getDouble(int row) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setString(int row, String val) {
+    byte[] serializedValue = StringUtil.encodeUtf8(val);
+    setBytes(row, serializedValue);
+  }
+
+  @Override
+  public String getString(int row) {
+    return StringUtil.decodeUtf8(_byteArrayStore.get(row));
+  }
+
+  @Override
+  public void setBytes(int row, byte[] value) {
+    _byteArrayStore.add(value);
+    _lengthOfLongestElement = Math.max(_lengthOfLongestElement, value.length);
+    _lengthOfShortestElement = Math.min(_lengthOfShortestElement, 
value.length);
+  }
+
+  @Override
+  public byte[] getBytes(int row) {
+    return _byteArrayStore.get(row);
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
index ea85788..293311c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.partition.PartitionFunction;
+import 
org.apache.pinot.core.io.readerwriter.BaseSingleColumnSingleValueReaderWriter;
 import org.apache.pinot.core.segment.creator.ColumnStatistics;
 import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
 
@@ -30,12 +31,15 @@ import static 
org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
 
 public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
 
+  final BaseSingleColumnSingleValueReaderWriter _forwardIndex;
   final BlockValSet _blockValSet;
   final int _numDocIds;
   final String _operatorName;
 
   public RealtimeNoDictionaryColStatistics(ColumnDataSource dataSource) {
     _operatorName = dataSource.getOperatorName();
+    // no-dictionary is only supported for SV columns
+    _forwardIndex = 
(BaseSingleColumnSingleValueReaderWriter)dataSource.getForwardIndex();
     Block block = dataSource.nextBlock();
     _numDocIds = block.getMetadata().getEndDocId() + 1;
     _blockValSet = block.getBlockValueSet();
@@ -63,12 +67,12 @@ public class RealtimeNoDictionaryColStatistics implements 
ColumnStatistics {
 
   @Override
   public int getLengthOfShortestElement() {
-    return lengthOfDataType(); // Only fixed length data types supported.
+    return _forwardIndex.getLengthOfShortestElement();
   }
 
   @Override
   public int getLengthOfLargestElement() {
-    return lengthOfDataType(); // Only fixed length data types supported.
+    return _forwardIndex.getLengthOfLongestElement();
   }
 
   @Override
@@ -105,19 +109,4 @@ public class RealtimeNoDictionaryColStatistics implements 
ColumnStatistics {
   public Set<Integer> getPartitions() {
     return null;
   }
-
-  private int lengthOfDataType() {
-    switch (_blockValSet.getValueType()) {
-      case INT:
-        return Integer.BYTES;
-      case LONG:
-        return Long.BYTES;
-      case FLOAT:
-        return Float.BYTES;
-      case DOUBLE:
-        return Double.BYTES;
-      default:
-        throw new UnsupportedOperationException();
-    }
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
index c75a69e..7fdf7a7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -190,4 +190,8 @@ public final class ColumnDataSource extends DataSource {
   public String getOperatorName() {
     return _operatorName;
   }
+
+  public DataFileReader getForwardIndex() {
+    return _forwardIndex;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 3e1e997..ce6eb4f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -21,10 +21,14 @@ package org.apache.pinot.core.common;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
 import 
org.apache.pinot.core.io.readerwriter.impl.FixedByteSingleColumnSingleValueReaderWriter;
+import 
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
 import org.testng.Assert;
@@ -38,11 +42,15 @@ public class RealtimeNoDictionaryTest {
   private static final String LONG_COL_NAME = "longcol";
   private static final String FLOAT_COL_NAME = "floatcol";
   private static final String DOUBLE_COL_NAME = "doublecol";
+  private static final String STRING_COL_NAME = "stringcol";
+  private static final String BYTES_COL_NAME = "bytescol";
   private static final int NUM_ROWS = 1000;
   private int[] _intVals = new int[NUM_ROWS];
   private long[] _longVals = new long[NUM_ROWS];
   private float[] _floatVals = new float[NUM_ROWS];
   private double[] _doubleVals = new double[NUM_ROWS];
+  private String[] _stringVals = new String[NUM_ROWS];
+
   private Random _random;
 
   private PinotDataBufferMemoryManager _memoryManager;
@@ -63,6 +71,8 @@ public class RealtimeNoDictionaryTest {
     FieldSpec longSpec = new MetricFieldSpec(LONG_COL_NAME, 
FieldSpec.DataType.LONG);
     FieldSpec floatSpec = new MetricFieldSpec(FLOAT_COL_NAME, 
FieldSpec.DataType.FLOAT);
     FieldSpec doubleSpec = new MetricFieldSpec(DOUBLE_COL_NAME, 
FieldSpec.DataType.DOUBLE);
+    FieldSpec stringSpec = new DimensionFieldSpec(STRING_COL_NAME, 
FieldSpec.DataType.STRING, true);
+    FieldSpec bytesSpec = new DimensionFieldSpec(BYTES_COL_NAME, 
FieldSpec.DataType.BYTES, true);
     _random = new Random(seed);
 
     FixedByteSingleColumnSingleValueReaderWriter intRawIndex =
@@ -77,6 +87,10 @@ public class RealtimeNoDictionaryTest {
     FixedByteSingleColumnSingleValueReaderWriter doubleRawIndex =
         new 
FixedByteSingleColumnSingleValueReaderWriter(_random.nextInt(NUM_ROWS) + 1, 
Double.BYTES, _memoryManager,
             "double");
+    VarByteSingleColumnSingleValueReaderWriter stringRawIndex =
+        new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, 
"StringColumn", 512, 30);
+    VarByteSingleColumnSingleValueReaderWriter bytesRawIndex =
+        new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, 
"BytesColumn", 512, 30);
 
     for (int i = 0; i < NUM_ROWS; i++) {
       _intVals[i] = _random.nextInt();
@@ -87,6 +101,11 @@ public class RealtimeNoDictionaryTest {
       floatRawIndex.setFloat(i, _floatVals[i]);
       _doubleVals[i] = _random.nextDouble();
       doubleRawIndex.setDouble(i, _doubleVals[i]);
+      // generate a random string of length between 10 and 100
+      int length = 10 + _random.nextInt(100 - 10);
+      _stringVals[i] = RandomStringUtils.randomAlphanumeric(length);
+      stringRawIndex.setString(i, _stringVals[i]);
+      bytesRawIndex.setBytes(i, StringUtil.encodeUtf8(_stringVals[i]));
     }
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
@@ -95,11 +114,41 @@ public class RealtimeNoDictionaryTest {
     dataSourceBlock.put(FLOAT_COL_NAME, new ColumnDataSource(floatSpec, 
NUM_ROWS, 0, floatRawIndex, null, null, null, null));
     dataSourceBlock
         .put(DOUBLE_COL_NAME, new ColumnDataSource(doubleSpec, NUM_ROWS, 0, 
doubleRawIndex, null, null, null, null));
+    dataSourceBlock
+        .put(STRING_COL_NAME, new ColumnDataSource(stringSpec, NUM_ROWS, 0, 
stringRawIndex, null, null, null, null));
+    dataSourceBlock
+        .put(BYTES_COL_NAME, new ColumnDataSource(bytesSpec, NUM_ROWS, 0, 
bytesRawIndex, null, null, null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
 
   @Test
+  public void testStringAndBytes() {
+    long seed = new Random().nextLong();
+    DataFetcher dataFetcher = makeDataFetcher(seed);
+    int[] docIds = new int[NUM_ROWS];
+    int numDocIds = _random.nextInt(NUM_ROWS) + 1;
+    for (int i = 0; i < numDocIds; i++) {
+      docIds[i] = _random.nextInt(NUM_ROWS);
+    }
+    try {
+      String[] stringValues = new String[NUM_ROWS];
+      dataFetcher.fetchStringValues(STRING_COL_NAME, docIds, numDocIds, 
stringValues);
+      for (int i = 0; i < numDocIds; i++) {
+        Assert.assertEquals(stringValues[i], _stringVals[docIds[i]], " for row 
" + docIds[i]);
+      }
+      byte[][] byteValues = new byte[NUM_ROWS][];
+      dataFetcher.fetchBytesValues(BYTES_COL_NAME, docIds, numDocIds, 
byteValues);
+      for (int i = 0; i < numDocIds; i++) {
+        Assert.assertEquals(StringUtil.decodeUtf8(byteValues[i]), 
_stringVals[docIds[i]], " for row " + docIds[i]);
+      }
+    } catch (Throwable t) {
+      t.printStackTrace();
+      Assert.fail("Failed with seed " + seed);
+    }
+  }
+
+  @Test
   public void testIntColumn() {
     long seed = new Random().nextLong();
     DataFetcher dataFetcher = makeDataFetcher(seed);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
new file mode 100644
index 0000000..9458243
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteSingleColumnSingleValueReaderWriterTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.index.readerwriter;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import 
org.apache.pinot.core.io.readerwriter.impl.VarByteSingleColumnSingleValueReaderWriter;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class VarByteSingleColumnSingleValueReaderWriterTest {
+  private PinotDataBufferMemoryManager _memoryManager;
+
+  @BeforeClass
+  public void setUp() {
+    _memoryManager = new 
DirectMemoryManager(VarByteSingleColumnSingleValueReaderWriterTest.class.getName());
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    _memoryManager.close();
+  }
+
+  @Test
+  public void testString()
+      throws IOException {
+    // use arbitrary cardinality and avg string length
+    // we will test with complete randomness
+    int initialCapacity = 5;
+    int estimatedAvgStringLength = 30;
+    try (VarByteSingleColumnSingleValueReaderWriter readerWriter =
+        new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, 
"StringColumn",  initialCapacity, estimatedAvgStringLength)) {
+      int rows = 1000;
+      Random random = new Random();
+      String[] data = new String[rows];
+
+      for (int i = 0; i < rows; i++) {
+        // generate a random string of length between 10 and 100
+        int length = 10 + random.nextInt(100 - 10);
+        data[i] = RandomStringUtils.randomAlphanumeric(length);
+        readerWriter.setString(i, data[i]);
+      }
+
+      for (int i = 0; i < rows; i++) {
+        Assert.assertEquals(readerWriter.getString(i), data[i]);
+      }
+    }
+  }
+
+  @Test
+  public void testBytes()
+      throws IOException {
+    int initialCapacity = 5;
+    int estimatedAvgStringLength = 30;
+    try (VarByteSingleColumnSingleValueReaderWriter readerWriter =
+        new VarByteSingleColumnSingleValueReaderWriter(_memoryManager, 
"StringColumn",  initialCapacity, estimatedAvgStringLength)) {
+      int rows = 1000;
+      Random random = new Random();
+      String[] data = new String[rows];
+
+      for (int i = 0; i < rows; i++) {
+        int length = 10 + random.nextInt(100 - 10);
+        data[i] = RandomStringUtils.randomAlphanumeric(length);
+        readerWriter.setBytes(i, StringUtil.encodeUtf8(data[i]));
+      }
+
+      for (int i = 0; i < rows; i++) {
+        Assert.assertEquals(StringUtil.decodeUtf8(readerWriter.getBytes(i)), 
data[i]);
+      }
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index c9ee5ca..a8b1bc3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -368,6 +368,14 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec> {
           throw new UnsupportedOperationException("Unsupported data type: " + 
this);
       }
     }
+
+    /**
+     * Check if the data type is for fixed width data (INT, LONG, FLOAT, 
DOUBLE)
+     * or variable width data (STRING, BYTES)
+     */
+    public boolean isFixedWidth() {
+      return this != STRING && this != BYTES;
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to