gortiz commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1154071771


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java:
##########
@@ -19,74 +19,265 @@
 
 package org.apache.pinot.segment.local.segment.index.dictionary;
 
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class DictionaryIndexType implements IndexType<IndexConfig, 
IndexReader, IndexCreator> {
-  public static final DictionaryIndexType INSTANCE = new DictionaryIndexType();
+public class DictionaryIndexType
+    extends AbstractIndexType<DictionaryIndexConfig, Dictionary, 
SegmentDictionaryCreator>
+    implements ConfigurableFromIndexLoadingConfig<DictionaryIndexConfig> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DictionaryIndexType.class);
 
-  private DictionaryIndexType() {
+  protected DictionaryIndexType() {
+    super(StandardIndexes.DICTIONARY_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.DICTIONARY_ID;
+  public Class<DictionaryIndexConfig> getIndexConfigClass() {
+    return DictionaryIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, DictionaryIndexConfig> fromIndexLoadingConfig(
+      IndexLoadingConfig indexLoadingConfig) {
+    Map<String, DictionaryIndexConfig> result = new HashMap<>();
+    Set<String> noDictionaryColumns = 
indexLoadingConfig.getNoDictionaryColumns();
+    Set<String> onHeapCols = indexLoadingConfig.getOnHeapDictionaryColumns();
+    Set<String> varLengthCols = 
indexLoadingConfig.getVarLengthDictionaryColumns();
+    for (String column : indexLoadingConfig.getAllKnownColumns()) {
+      if (noDictionaryColumns.contains(column)) {
+        result.put(column, DictionaryIndexConfig.disabled());
+      } else {
+        result.put(column, new 
DictionaryIndexConfig(onHeapCols.contains(column), 
varLengthCols.contains(column)));
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+  public DictionaryIndexConfig getDefaultConfig() {
+    return DictionaryIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<DictionaryIndexConfig> getDeserializer() {
+    // reads tableConfig.indexingConfig.noDictionaryConfig
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictConf = 
IndexConfigDeserializer.fromMap(
+        tableConfig -> tableConfig.getIndexingConfig() == null ? 
Collections.emptyMap()
+            : tableConfig.getIndexingConfig().getNoDictionaryConfig(),
+        (accum, col, value) -> accum.put(col, 
DictionaryIndexConfig.disabled()));
+
+    // reads tableConfig.indexingConfig.noDictionaryColumns
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictCol = 
IndexConfigDeserializer.fromCollection(
+        tableConfig -> tableConfig.getIndexingConfig() == null ? 
Collections.emptyList()
+            : tableConfig.getIndexingConfig().getNoDictionaryColumns(),
+        (accum, noDictionaryCol) -> accum.put(noDictionaryCol, 
DictionaryIndexConfig.disabled()));
+
+    // reads tableConfig.fieldConfigList.encodingType
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromFieldConfigList = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), DictionaryIndexConfig.disabled());
+          }
+        });
+
+    // reads tableConfig.indexingConfig.onHeapDictionaryColumns and
+    // tableConfig.indexingConfig.varLengthDictionaryColumns
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromIndexingConfig = 
(tableConfig, schema) -> {
+      IndexingConfig ic = tableConfig.getIndexingConfig();
+      if (ic == null) {
+        return Collections.emptyMap();
+      }
+      Set<String> onHeap = new HashSet<>(
+          ic.getOnHeapDictionaryColumns() == null ? Collections.emptyList() : 
ic.getOnHeapDictionaryColumns());
+      Set<String> varLength = new HashSet<>(
+          ic.getVarLengthDictionaryColumns() == null ? Collections.emptyList() 
: ic.getVarLengthDictionaryColumns()
+      );
+      Function<String, DictionaryIndexConfig> valueCalculator =
+          column -> new DictionaryIndexConfig(onHeap.contains(column), 
varLength.contains(column));
+      return Sets.union(onHeap, varLength).stream()
+          .collect(Collectors.toMap(Function.identity(), valueCalculator));
+    };
+
+    return fromNoDictConf
+        .withFallbackAlternative(fromNoDictCol)
+        .withFallbackAlternative(fromFieldConfigList)
+        .withFallbackAlternative(fromIndexingConfig)
+        .withExclusiveAlternative(IndexConfigDeserializer.fromIndexes(getId(), 
getIndexConfigClass()));
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
+  public SegmentDictionaryCreator createIndexCreator(IndexCreationContext 
context, DictionaryIndexConfig indexConfig) {
+    boolean useVarLengthDictionary = shouldUseVarLengthDictionary(context, 
indexConfig);
+    return new SegmentDictionaryCreator(context.getFieldSpec(), 
context.getIndexDir(), useVarLengthDictionary);
+  }
+
+  public boolean shouldUseVarLengthDictionary(IndexCreationContext context, 
DictionaryIndexConfig indexConfig) {
+    if (indexConfig.getUseVarLengthDictionary()) {
+      return true;
+    }
+    FieldSpec.DataType storedType = 
context.getFieldSpec().getDataType().getStoredType();
+    if (storedType != FieldSpec.DataType.BYTES && storedType != 
FieldSpec.DataType.BIG_DECIMAL) {
+      return false;
+    }
+    return !context.isFixedLength();
+  }
+
+  public static boolean shouldUseVarLengthDictionary(String columnName, 
Set<String> varLengthDictColumns,
+      FieldSpec.DataType columnStoredType, ColumnStatistics columnProfile) {
+    if (varLengthDictColumns.contains(columnName)) {
+      return true;
+    }
+
+    return shouldUseVarLengthDictionary(columnStoredType, columnProfile);
+  }
+
+  public static boolean shouldUseVarLengthDictionary(FieldSpec.DataType 
columnStoredType, ColumnStatistics profile) {
+    if (columnStoredType == FieldSpec.DataType.BYTES || columnStoredType == 
FieldSpec.DataType.BIG_DECIMAL) {
+      return !profile.isFixedLength();
+    }
+
+    return false;
+  }
+
+  public SegmentDictionaryCreator createIndexCreator(FieldSpec fieldSpec, File 
indexDir, boolean useVarLengthDictionary)
       throws Exception {
-    throw new UnsupportedOperationException();
+    return new SegmentDictionaryCreator(fieldSpec, indexDir, 
useVarLengthDictionary);
+  }
+
+  public static Dictionary read(SegmentDirectory.Reader segmentReader, 
ColumnMetadata columnMetadata)
+      throws IOException {
+    PinotDataBuffer dataBuffer =
+        segmentReader.getIndexFor(columnMetadata.getColumnName(), 
StandardIndexes.dictionary());
+    return read(dataBuffer, columnMetadata, new DictionaryIndexConfig(false, 
true));
+  }
+
+  public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata 
metadata, DictionaryIndexConfig indexConfig)
+      throws IOException {
+    FieldSpec.DataType dataType = metadata.getDataType();
+    boolean loadOnHeap = indexConfig.isOnHeap();
+    if (loadOnHeap) {
+      String columnName = metadata.getColumnName();
+      LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
+    }
+
+    int length = metadata.getCardinality();
+    switch (dataType.getStoredType()) {
+      case INT:
+        return loadOnHeap ? new OnHeapIntDictionary(dataBuffer, length)
+            : new IntDictionary(dataBuffer, length);
+      case LONG:
+        return loadOnHeap ? new OnHeapLongDictionary(dataBuffer, length)
+            : new LongDictionary(dataBuffer, length);
+      case FLOAT:
+        return loadOnHeap ? new OnHeapFloatDictionary(dataBuffer, length)
+            : new FloatDictionary(dataBuffer, length);
+      case DOUBLE:
+        return loadOnHeap ? new OnHeapDoubleDictionary(dataBuffer, length)
+            : new DoubleDictionary(dataBuffer, length);
+      case BIG_DECIMAL:
+        int numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapBigDecimalDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new BigDecimalDictionary(dataBuffer, length, numBytesPerValue);
+      case STRING:
+        numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new StringDictionary(dataBuffer, length, numBytesPerValue);
+      case BYTES:
+        numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapBytesDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new BytesDictionary(dataBuffer, length, numBytesPerValue);
+      default:
+        throw new IllegalStateException("Unsupported data type for dictionary: 
" + dataType);
+    }
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexReaderFactory<Dictionary> getReaderFactory() {
+    return ReaderFactory.INSTANCE;
   }
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return IndexHandler.NoOp.INSTANCE;
   }
 
-  @Override
-  public String getFileExtension(ColumnMetadata columnMetadata) {
+  public static String getFileExtension() {
     return V1Constants.Dict.FILE_EXTENSION;
   }
 
   @Override
-  public String toString() {
-    return getId();
+  public String getFileExtension(ColumnMetadata columnMetadata) {
+    return getFileExtension();
+  }
+
+  public static class ReaderFactory extends 
IndexReaderFactory.Default<DictionaryIndexConfig, Dictionary> {

Review Comment:
   BTW InvertedIndexType.ReaderFactory is being in 
InvertedIndexAndDictionaryBasedForwardIndexCreator. Given that this class is 
_low level_ (in the sense that we create an ephemeral reader in order to create 
the fwd index) I decided to open the factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to