gortiz commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1153279242
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
package org.apache.pinot.segment.local.segment.index.forward;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
+import
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader,
IndexCreator> {
+public class ForwardIndexType
+ extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader,
ForwardIndexCreator>
+ implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
- public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
- private ForwardIndexType() {
+ protected ForwardIndexType() {
+ super(StandardIndexes.FORWARD_ID);
}
@Override
- public String getId() {
- return StandardIndexes.FORWARD_ID;
+ public Class<ForwardIndexConfig> getIndexConfigClass() {
+ return ForwardIndexConfig.class;
}
@Override
- public Class<IndexConfig> getIndexConfigClass() {
- return null;
+ public Map<String, ForwardIndexConfig>
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+ Set<String> disabledCols =
indexLoadingConfig.getForwardIndexDisabledColumns();
+ Map<String, ForwardIndexConfig> result = new HashMap<>();
+ Set<String> allColumns = Sets.union(disabledCols,
indexLoadingConfig.getAllKnownColumns());
+ for (String column : allColumns) {
+ ChunkCompressionType compressionType =
+ indexLoadingConfig.getCompressionConfigs() != null
+ ? indexLoadingConfig.getCompressionConfigs().get(column)
+ : null;
+ Supplier<ForwardIndexConfig> defaultConfig = () -> {
+ if (compressionType == null) {
+ return ForwardIndexConfig.DEFAULT;
+ } else {
+ return new
ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
+ }
+ };
+ if (!disabledCols.contains(column)) {
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ if (tableConfig == null) {
+ result.put(column, defaultConfig.get());
+ } else {
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList == null) {
+ result.put(column, defaultConfig.get());
+ continue;
+ }
+ FieldConfig fieldConfig = fieldConfigList.stream()
+ .filter(fc -> fc.getName().equals(column))
+ .findAny()
+ .orElse(null);
+ if (fieldConfig == null) {
+ result.put(column, defaultConfig.get());
+ continue;
+ }
+ ForwardIndexConfig.Builder builder = new
ForwardIndexConfig.Builder();
+ if (compressionType != null) {
+ builder.withCompressionType(compressionType);
+ } else {
+ FieldConfig.CompressionCodec compressionCodec =
fieldConfig.getCompressionCodec();
+ if (compressionCodec != null) {
+
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+ }
+ }
+
+ result.put(column, builder.build());
+ }
+ } else {
+ result.put(column, ForwardIndexConfig.DISABLED);
+ }
+ }
+ return result;
}
@Override
- public IndexConfig getDefaultConfig() {
- return IndexConfig.ENABLED;
+ public ForwardIndexConfig getDefaultConfig() {
+ return ForwardIndexConfig.DEFAULT;
}
@Override
- public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
- throw new UnsupportedOperationException();
+ public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() {
+ // reads tableConfig.fieldConfigList and decides what to create using the
FieldConfig properties and encoding
+ ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig =
IndexConfigDeserializer.fromCollection(
+ TableConfig::getFieldConfigList,
+ (accum, fieldConfig) -> {
+ Map<String, String> properties = fieldConfig.getProperties();
+ if (properties != null && isDisabled(properties)) {
+ accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
+ } else if (fieldConfig.getEncodingType() ==
FieldConfig.EncodingType.RAW) {
+ accum.put(fieldConfig.getName(),
createConfigFromFieldConfig(fieldConfig));
+ }
+ }
+ );
+ return IndexConfigDeserializer.fromIndexes("forward",
getIndexConfigClass())
+ .withExclusiveAlternative(fromFieldConfig);
+ }
+
+ private boolean isDisabled(Map<String, String> props) {
+ return Boolean.parseBoolean(
+ props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+ }
+
+ private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig
fieldConfig) {
+ if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) {
+ throw new IllegalArgumentException("Cannot build a forward index on a
field whose encoding is "
+ + fieldConfig.getEncodingType());
+ }
+ FieldConfig.CompressionCodec compressionCodec =
fieldConfig.getCompressionCodec();
+ ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
+ if (compressionCodec != null) {
+
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+ }
+
+ Map<String, String> properties = fieldConfig.getProperties();
+ if (properties != null) {
+ builder.withLegacyProperties(properties);
+ }
+
+ return builder.build();
+ }
+
+ public static ChunkCompressionType
getDefaultCompressionType(FieldSpec.FieldType fieldType) {
+ if (fieldType == FieldSpec.FieldType.METRIC) {
+ return ChunkCompressionType.PASS_THROUGH;
+ } else {
+ return ChunkCompressionType.LZ4;
+ }
}
@Override
- public IndexCreator createIndexCreator(IndexCreationContext context,
IndexConfig indexConfig)
+ public ForwardIndexCreator createIndexCreator(IndexCreationContext context,
ForwardIndexConfig indexConfig)
throws Exception {
- throw new UnsupportedOperationException();
+ return ForwardIndexCreatorFactory.createIndexCreator(context, indexConfig);
}
@Override
- public IndexReaderFactory<IndexReader> getReaderFactory() {
- throw new UnsupportedOperationException();
+ public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory,
Map<String, FieldIndexConfigs> configsByCol,
+ @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+ return new ForwardIndexHandler(segmentDirectory, configsByCol, schema,
tableConfig);
+ }
+
+ @Override
+ public IndexReaderFactory<ForwardIndexReader> getReaderFactory() {
+ return ForwardIndexReaderFactory.INSTANCE;
+ }
+
+ public static ForwardIndexReader<?> read(SegmentDirectory.Reader
segmentReader, FieldIndexConfigs fieldIndexConfigs,
+ ColumnMetadata metadata)
+ throws IndexReaderConstraintException, IOException {
+ return
StandardIndexes.forward().getReaderFactory().createIndexReader(segmentReader,
fieldIndexConfigs, metadata);
Review Comment:
I've added some javadoc to make it clearer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]