gortiz commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1154079816


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class ForwardIndexType
+    extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, 
ForwardIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
 
-  public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
-  private ForwardIndexType() {
+  protected ForwardIndexType() {
+    super(StandardIndexes.FORWARD_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FORWARD_ID;
+  public Class<ForwardIndexConfig> getIndexConfigClass() {
+    return ForwardIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return null;
+  public Map<String, ForwardIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Set<String> disabledCols = 
indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, ForwardIndexConfig> result = new HashMap<>();
+    Set<String> allColumns = Sets.union(disabledCols, 
indexLoadingConfig.getAllKnownColumns());
+    for (String column : allColumns) {
+      ChunkCompressionType compressionType =
+          indexLoadingConfig.getCompressionConfigs() != null
+              ? indexLoadingConfig.getCompressionConfigs().get(column)
+              : null;
+      Supplier<ForwardIndexConfig> defaultConfig = () -> {
+        if (compressionType == null) {
+          return ForwardIndexConfig.DEFAULT;
+        } else {
+          return new 
ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
+        }
+      };
+      if (!disabledCols.contains(column)) {
+        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+        if (tableConfig == null) {
+          result.put(column, defaultConfig.get());
+        } else {
+          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+          if (fieldConfigList == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          FieldConfig fieldConfig = fieldConfigList.stream()
+              .filter(fc -> fc.getName().equals(column))
+              .findAny()
+              .orElse(null);
+          if (fieldConfig == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          ForwardIndexConfig.Builder builder = new 
ForwardIndexConfig.Builder();
+          if (compressionType != null) {
+            builder.withCompressionType(compressionType);
+          } else {
+            FieldConfig.CompressionCodec compressionCodec = 
fieldConfig.getCompressionCodec();
+            if (compressionCodec != null) {
+              
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+            }
+          }
+
+          result.put(column, builder.build());
+        }
+      } else {
+        result.put(column, ForwardIndexConfig.DISABLED);
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.ENABLED;
+  public ForwardIndexConfig getDefaultConfig() {
+    return ForwardIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() {
+    // reads tableConfig.fieldConfigList and decides what to create using the 
FieldConfig properties and encoding
+    ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          Map<String, String> properties = fieldConfig.getProperties();
+          if (properties != null && isDisabled(properties)) {
+            accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
+          } else if (fieldConfig.getEncodingType() == 
FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), 
createConfigFromFieldConfig(fieldConfig));
+          }
+        }
+    );
+    return IndexConfigDeserializer.fromIndexes("forward", 
getIndexConfigClass())
+        .withExclusiveAlternative(fromFieldConfig);
+  }
+
+  private boolean isDisabled(Map<String, String> props) {
+    return Boolean.parseBoolean(
+        props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, 
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+  }
+
+  private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig 
fieldConfig) {
+    if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) {
+      throw new IllegalArgumentException("Cannot build a forward index on a 
field whose encoding is "

Review Comment:
   I see. The code seems to be ok. What you mention is explicitly tested by the 
new `ForwardIndexTypeTest` (for example in `oldConfEnableDict`. I've added a 
couple extra tests that prove that when encoding is `DICTIONARY` the forward 
index is created by default, ignoring other attributes (like compression). I 
don't know if that is the expected behavior. It is easy to change in future in 
case we need to.
   
   Anyway, I'm going to change the code a little bit to make it more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to