This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 88edfd49cc Update Segment builder to use column major tables (#11776)
88edfd49cc is described below
commit 88edfd49cc27042ae923e053cd47f1bec7fb5663
Author: Erich <[email protected]>
AuthorDate: Sun Oct 22 13:05:59 2023 -0400
Update Segment builder to use column major tables (#11776)
---
.../realtime/RealtimeSegmentDataManager.java | 6 +-
.../converter/RealtimeSegmentConverterTest.java | 319 ++++++++++++++++++++-
.../converter/RealtimeSegmentConverter.java | 31 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 53 ++++
.../impl/SegmentIndexCreationDriverImpl.java | 95 ++++--
.../pinot/segment/spi/creator/SegmentCreator.java | 12 +
.../spi/creator/SegmentGeneratorConfig.java | 4 -
.../pinot/spi/config/table/IndexingConfig.java | 9 +
.../table/ingestion/StreamIngestionConfig.java | 11 +
.../spi/utils/builder/TableConfigBuilder.java | 7 +
10 files changed, 507 insertions(+), 40 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e772eae7e8..f3e8fc588e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -934,7 +934,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset(_segmentZKMetadata.getStartOffset());
segmentZKPropsConfig.setEndOffset(_currentOffset.toString());
- // lets convert the segment now
+ // let's convert the segment now
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig,
tempSegmentFolder.getAbsolutePath(),
_schema, _tableNameWithType, _tableConfig,
_segmentZKMetadata.getSegmentName(),
@@ -951,8 +951,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
final long buildTimeMillis = now() - lockAcquireTimeMillis;
final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis;
- _segmentLogger
- .info("Successfully built segment in {} ms, after lockWaitTime {}
ms", buildTimeMillis, waitTimeMillis);
+ _segmentLogger.info("Successfully built segment (Column Mode: {}) in {}
ms, after lockWaitTime {} ms",
+ converter.isColumnMajorEnabled(), buildTimeMillis, waitTimeMillis);
File dataDir = new File(_resourceDataDir);
File indexDir = new File(dataDir, _segmentNameStr);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index 28df031be2..50f1396c10 100644
---
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -22,20 +22,33 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import
org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -44,6 +57,8 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
@@ -84,7 +99,7 @@ public class RealtimeSegmentConverterTest {
}
@Test
- public void testNoRecordsIndexed()
+ public void testNoRecordsIndexedRowMajorSegmentBuilder()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
@@ -93,7 +108,8 @@ public class RealtimeSegmentConverterTest {
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
.setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
-
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).setColumnMajorSegmentBuilderEnabled(false)
+ .build();
Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1,
FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
@@ -130,8 +146,8 @@ public class RealtimeSegmentConverterTest {
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
- indexingConfig.getInvertedIndexColumns(), null, null,
- indexingConfig.getNoDictionaryColumns(),
indexingConfig.getVarLengthDictionaryColumns());
+ indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns());
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
tableNameWithType, tableConfig, segmentName, cdc, false);
@@ -148,6 +164,301 @@ public class RealtimeSegmentConverterTest {
assertEquals(segmentMetadata.getEndOffset(), "100");
}
+ @Test
+ public void test10RecordsIndexedRowMajorSegmentBuilder()
+ throws Exception {
+ File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+ .setTimeColumnName(DATE_TIME_COLUMN)
+ .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1,
LONG_COLUMN1))
+ .setSortedColumn(LONG_COLUMN1)
+ .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+ .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+ .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+ .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3))
+ .setColumnMajorSegmentBuilderEnabled(false)
+ .build();
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+ .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = "testTable__0__0__123456";
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+ DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false,
true);
+
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+ .setAvgNumMultiValues(3)
+ .setIndex(Sets.newHashSet(LONG_COLUMN2),
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+ .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)),
StandardIndexes.dictionary(), varLengthDictConf)
+ .setIndex(Sets.newHashSet(STRING_COLUMN1, LONG_COLUMN1),
StandardIndexes.inverted(), IndexConfig.ENABLED)
+
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+ .setMemoryManager(new DirectMemoryManager(segmentName))
+ .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new
File(tmpDir, "stats")))
+ .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+ // create mutable segment impl
+ MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+ List<GenericRow> rows = generateTestData();
+
+ for (GenericRow row : rows) {
+ mutableSegmentImpl.index(row, null);
+ }
+
+ File outputDir = new File(tmpDir, "outputDir");
+ SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+ segmentZKPropsConfig.setStartOffset("1");
+ segmentZKPropsConfig.setEndOffset("100");
+ ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+ indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns());
+ RealtimeSegmentConverter converter =
+ new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
+ tableNameWithType, tableConfig, segmentName, cdc, false);
+ converter.build(SegmentVersion.v3, null);
+
+ File indexDir = new File(outputDir, segmentName);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+ assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+ assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+ long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+ long expectedEndTime = (long) rows.get(rows.size() -
1).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+ assertEquals(segmentMetadata.getStartOffset(), "1");
+ assertEquals(segmentMetadata.getEndOffset(), "100");
+
+ testSegment(rows, indexDir, tableConfig, segmentMetadata);
+ }
+
+ @Test
+ public void testNoRecordsIndexedColumnMajorSegmentBuilder()
+ throws Exception {
+ File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+ .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+ .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+ .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).setColumnMajorSegmentBuilderEnabled(true)
+ .build();
+ Schema schema = new
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+ .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = "testTable__0__0__123456";
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+ DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false,
true);
+
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+ .setAvgNumMultiValues(3)
+ .setIndex(Sets.newHashSet(LONG_COLUMN2),
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+ .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)),
StandardIndexes.dictionary(), varLengthDictConf)
+ .setIndex(Sets.newHashSet(STRING_COLUMN1),
StandardIndexes.inverted(), IndexConfig.ENABLED)
+
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+ .setMemoryManager(new DirectMemoryManager(segmentName))
+ .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new
File(tmpDir, "stats")))
+ .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+ // create mutable segment impl
+ MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+
+ File outputDir = new File(tmpDir, "outputDir");
+ SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+ segmentZKPropsConfig.setStartOffset("1");
+ segmentZKPropsConfig.setEndOffset("100");
+ ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+ indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns());
+ RealtimeSegmentConverter converter =
+ new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
+ tableNameWithType, tableConfig, segmentName, cdc, false);
+ converter.build(SegmentVersion.v3, null);
+
+ File indexDir = new File(outputDir, segmentName);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ assertEquals(segmentMetadata.getTotalDocs(), 0);
+ assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+ assertEquals(segmentMetadata.getStartTime(), segmentMetadata.getEndTime());
+
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+ assertEquals(segmentMetadata.getStartOffset(), "1");
+ assertEquals(segmentMetadata.getEndOffset(), "100");
+ }
+
+ @Test
+ public void test10RecordsIndexedColumnMajorSegmentBuilder()
+ throws Exception {
+ File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+ .setTimeColumnName(DATE_TIME_COLUMN)
+ .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1,
LONG_COLUMN1))
+ .setSortedColumn(LONG_COLUMN1)
+ .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+ .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+ .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+ .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3))
+ .setColumnMajorSegmentBuilderEnabled(true)
+ .build();
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+ .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = "testTable__0__0__123456";
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+ DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false,
true);
+
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+ .setAvgNumMultiValues(3)
+ .setIndex(Sets.newHashSet(LONG_COLUMN2),
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+ .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)),
StandardIndexes.dictionary(), varLengthDictConf)
+ .setIndex(Sets.newHashSet(STRING_COLUMN1, LONG_COLUMN1),
StandardIndexes.inverted(), IndexConfig.ENABLED)
+
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+ .setMemoryManager(new DirectMemoryManager(segmentName))
+ .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new
File(tmpDir, "stats")))
+ .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+ // create mutable segment impl
+ MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+ List<GenericRow> rows = generateTestData();
+
+ for (GenericRow row : rows) {
+ mutableSegmentImpl.index(row, null);
+ }
+
+ File outputDir = new File(tmpDir, "outputDir");
+ SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+ segmentZKPropsConfig.setStartOffset("1");
+ segmentZKPropsConfig.setEndOffset("100");
+ ColumnIndicesForRealtimeTable cdc = new
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+ indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns());
+ RealtimeSegmentConverter converter =
+ new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig,
outputDir.getAbsolutePath(), schema,
+ tableNameWithType, tableConfig, segmentName, cdc, false);
+ converter.build(SegmentVersion.v3, null);
+
+ File indexDir = new File(outputDir, segmentName);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+ assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+ assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+ long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+ long expectedEndTime = (long) rows.get(rows.size() -
1).getValue(DATE_TIME_COLUMN);
+ assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+ assertEquals(segmentMetadata.getStartOffset(), "1");
+ assertEquals(segmentMetadata.getEndOffset(), "100");
+
+ testSegment(rows, indexDir, tableConfig, segmentMetadata);
+ }
+
+ private void testSegment(List<GenericRow> rows, File indexDir,
+ TableConfig tableConfig, SegmentMetadataImpl segmentMetadata)
+ throws IOException {
+ SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir,
segmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Reader segmentReader = segmentDir.createReader();
+
+ Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
+ Map<String, ColumnMetadata> columnMetadataMap =
segmentMetadata.getColumnMetadataMap();
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null,
tableConfig);
+ for (Map.Entry<String, ColumnMetadata> entry :
columnMetadataMap.entrySet()) {
+ indexContainerMap.put(entry.getKey(),
+ new PhysicalColumnIndexContainer(segmentReader, entry.getValue(),
indexLoadingConfig));
+ }
+ ImmutableSegmentImpl segmentFile = new ImmutableSegmentImpl(segmentDir,
segmentMetadata, indexContainerMap, null);
+
+ GenericRow readRow = new GenericRow();
+ int docId = 0;
+ for (GenericRow row : rows) {
+ segmentFile.getRecord(docId, readRow);
+ assertEquals(readRow.getValue(STRING_COLUMN1),
row.getValue(STRING_COLUMN1));
+ assertEquals(readRow.getValue(STRING_COLUMN2),
row.getValue(STRING_COLUMN2));
+ assertEquals(readRow.getValue(STRING_COLUMN3),
row.getValue(STRING_COLUMN3));
+ assertEquals(readRow.getValue(STRING_COLUMN4),
row.getValue(STRING_COLUMN4));
+ assertEquals(readRow.getValue(LONG_COLUMN1), row.getValue(LONG_COLUMN1));
+ assertEquals(readRow.getValue(LONG_COLUMN2), row.getValue(LONG_COLUMN2));
+ assertEquals(readRow.getValue(LONG_COLUMN3), row.getValue(LONG_COLUMN3));
+ assertEquals(readRow.getValue(LONG_COLUMN4), row.getValue(LONG_COLUMN4));
+ assertEquals(readRow.getValue(MV_INT_COLUMN),
row.getValue(MV_INT_COLUMN));
+ assertEquals(readRow.getValue(DATE_TIME_COLUMN),
row.getValue(DATE_TIME_COLUMN));
+
+ docId += 1;
+ }
+ }
+
+ private List<GenericRow> generateTestData() {
+ LinkedList<GenericRow> rows = new LinkedList<>();
+
+ for (int i = 0; i < 10; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(STRING_COLUMN1, "Hello" + i);
+ row.putValue(STRING_COLUMN2, "World" + i);
+ row.putValue(STRING_COLUMN3, "F1" + i);
+ row.putValue(STRING_COLUMN4, "F2" + i);
+ row.putValue(LONG_COLUMN1, 67L + i);
+ row.putValue(LONG_COLUMN2, 66L + i);
+ row.putValue(LONG_COLUMN3, 65L + i);
+ row.putValue(LONG_COLUMN4, 64L + i);
+ List<Integer> intList = new ArrayList<>();
+ intList.add(100 + i);
+ intList.add(200 + i);
+ row.putValue(MV_INT_COLUMN, intList.toArray());
+ row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
+ rows.add(row);
+ }
+
+ return rows;
+ }
+
private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index c0a29afcf0..a6e2d067b6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -54,6 +54,7 @@ public class RealtimeSegmentConverter {
private final String _segmentName;
private final ColumnIndicesForRealtimeTable _columnIndicesForRealtimeTable;
private final boolean _nullHandlingEnabled;
+ private final boolean _enableColumnMajor;
public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment,
SegmentZKPropsConfig segmentZKPropsConfig,
String outputPath, Schema schema, String tableName, TableConfig
tableConfig, String segmentName,
@@ -70,11 +71,19 @@ public class RealtimeSegmentConverter {
_tableConfig = tableConfig;
_segmentName = segmentName;
_nullHandlingEnabled = nullHandlingEnabled;
+ if (_tableConfig.getIngestionConfig() != null
+ && _tableConfig.getIngestionConfig().getStreamIngestionConfig() !=
null) {
+ _enableColumnMajor = _tableConfig.getIngestionConfig()
+ .getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled();
+ } else {
+ _enableColumnMajor =
_tableConfig.getIndexingConfig().isColumnMajorSegmentBuilderEnabled();
+ }
}
public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics
serverMetrics)
throws Exception {
SegmentGeneratorConfig genConfig = new
SegmentGeneratorConfig(_tableConfig, _dataSchema);
+
// The segment generation code in SegmentColumnarIndexCreator will throw
// exception if start and end time in time column are not in acceptable
// range. We don't want the realtime consumption to stop (if an exception
@@ -110,15 +119,19 @@ public class RealtimeSegmentConverter {
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader()) {
- int[] sortedDocIds =
- _columnIndicesForRealtimeTable.getSortedColumn() != null
- ?
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(
- _columnIndicesForRealtimeTable.getSortedColumn()) : null;
+ int[] sortedDocIds = _columnIndicesForRealtimeTable.getSortedColumn() !=
null
+ ? _realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(
+ _columnIndicesForRealtimeTable.getSortedColumn()) : null;
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
recordReader);
driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline());
- driver.build();
+
+ if (!_enableColumnMajor) {
+ driver.build();
+ } else {
+ driver.buildByColumn(_realtimeSegmentImpl);
+ }
}
if (segmentPartitionConfig != null) {
@@ -130,8 +143,8 @@ public class RealtimeSegmentConverter {
}
}
- private <C extends IndexConfig> void
addIndexOrDefault(SegmentGeneratorConfig genConfig,
- IndexType<C, ?, ?> indexType, @Nullable Collection<String> columns, C
defaultConfig) {
+ private <C extends IndexConfig> void
addIndexOrDefault(SegmentGeneratorConfig genConfig, IndexType<C, ?, ?>
indexType,
+ @Nullable Collection<String> columns, C defaultConfig) {
Map<String, C> config = indexType.getConfig(genConfig.getTableConfig(),
genConfig.getSchema());
if (columns != null) {
for (String column : columns) {
@@ -154,4 +167,8 @@ public class RealtimeSegmentConverter {
}
return newSchema;
}
+
+ public boolean isColumnMajorEnabled() {
+ return _enableColumnMajor;
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index a84c275299..10e0a5c1d0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -40,6 +40,8 @@ import
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVe
import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexPlugin;
import
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
@@ -336,6 +338,57 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
_docIdCounter++;
}
+ @Override
+ public void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment)
+ throws IOException {
+ // Iterate over each value in the column
+ int numDocs = segment.getSegmentMetadata().getTotalDocs();
+ if (numDocs == 0) {
+ return;
+ }
+
+ try (PinotSegmentColumnReader colReader = new
PinotSegmentColumnReader(segment, columnName)) {
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
_creatorsByColAndIndex.get(columnName);
+ NullValueVectorCreator nullVec =
_nullValueVectorCreatorMap.get(columnName);
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+ SegmentDictionaryCreator dictionaryCreator =
_dictionaryCreatorMap.get(columnName);
+ if (sortedDocIds != null) {
+ int onDiskDocId = 0;
+ for (int docId : sortedDocIds) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, onDiskDocId,
+ nullVec);
+ onDiskDocId++;
+ }
+ } else {
+ for (int docId = 0; docId < numDocs; docId++) {
+ indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec,
dictionaryCreator, docId, docId, nullVec);
+ }
+ }
+ }
+ }
+
+ private void indexColumnValue(PinotSegmentColumnReader colReader,
+ Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String
columnName, FieldSpec fieldSpec,
+ SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int
onDiskDocPos, NullValueVectorCreator nullVec)
+ throws IOException {
+ Object columnValueToIndex = colReader.getValue(sourceDocId);
+ if (columnValueToIndex == null) {
+ throw new RuntimeException("Null value for column:" + columnName);
+ }
+
+ if (fieldSpec.isSingleValueField()) {
+ indexSingleValueRow(dictionaryCreator, columnValueToIndex,
creatorsByIndex);
+ } else {
+ indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex,
creatorsByIndex);
+ }
+
+ if (_nullHandlingEnabled) {
+ if (colReader.isNull(sourceDocId)) {
+ nullVec.setNull(onDiskDocPos);
+ }
+ }
+ }
+
private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator,
Object value,
Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
throws IOException {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 0293e644c4..129bc35ccf 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -29,7 +29,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
@@ -45,6 +47,7 @@ import
org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
import org.apache.pinot.segment.local.utils.CrcUtils;
import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
@@ -102,7 +105,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
private int _totalDocs = 0;
private File _tempIndexDir;
private String _segmentName;
- private long _totalRecordReadTime = 0;
+ private long _totalRecordReadTimeNs = 0;
private long _totalIndexTime = 0;
private long _totalStatsCollectorTime = 0;
private boolean _continueOnError;
@@ -122,8 +125,8 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
TableConfig tableConfig = segmentGeneratorConfig.getTableConfig();
FileFormat fileFormat = segmentGeneratorConfig.getFormat();
String recordReaderClassName =
segmentGeneratorConfig.getRecordReaderPath();
- Set<String> sourceFields = IngestionUtils
- .getFieldsForRecordExtractor(tableConfig.getIngestionConfig(),
segmentGeneratorConfig.getSchema());
+ Set<String> sourceFields =
IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(),
+ segmentGeneratorConfig.getSchema());
// Allow for instantiation general record readers from a record reader
path passed into segment generator config
// If this is set, this will override the file format
@@ -142,8 +145,8 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
if (fileFormat == FileFormat.PINOT) {
return new PinotSegmentRecordReader(dataFile, schema,
segmentGeneratorConfig.getColumnSortOrder());
} else {
- return RecordReaderFactory
- .getRecordReader(fileFormat, dataFile, sourceFields,
segmentGeneratorConfig.getReaderConfig());
+ return RecordReaderFactory.getRecordReader(fileFormat, dataFile,
sourceFields,
+ segmentGeneratorConfig.getReaderConfig());
}
}
@@ -199,8 +202,9 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
indexDir.mkdirs();
}
- _ingestionSchemaValidator = SchemaValidatorFactory
- .getSchemaValidator(_dataSchema, _recordReader.getClass().getName(),
config.getInputFilePath());
+ _ingestionSchemaValidator =
+ SchemaValidatorFactory.getSchemaValidator(_dataSchema,
_recordReader.getClass().getName(),
+ config.getInputFilePath());
// Create a temporary directory used in segment creation
_tempIndexDir = new File(indexDir, "tmp-" + UUID.randomUUID());
@@ -229,16 +233,21 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
while (_recordReader.hasNext()) {
- long recordReadStartTime = System.currentTimeMillis();
- long recordReadStopTime = System.currentTimeMillis();
+ long recordReadStopTime = System.nanoTime();
long indexStopTime;
reuse.clear();
+
try {
+ long recordReadStartTime = System.nanoTime();
GenericRow decodedRow = _recordReader.next(reuse);
- recordReadStartTime = System.currentTimeMillis();
+ recordReadStartTime = System.nanoTime();
+
+ // Should not be needed anymore.
+ // Add row to indexes
_transformPipeline.processRow(decodedRow, reusedResult);
- recordReadStopTime = System.currentTimeMillis();
- _totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
+
+ recordReadStopTime = System.nanoTime();
+ _totalRecordReadTimeNs += (recordReadStopTime - recordReadStartTime);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Error occurred while reading row
during indexing", e);
@@ -273,6 +282,47 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
handlePostCreation();
}
+ public void buildByColumn(IndexSegment indexSegment)
+ throws Exception {
+ // Count the number of documents and gather per-column statistics
+ LOGGER.debug("Start building StatsCollector!");
+ buildIndexCreationInfo();
+ LOGGER.info("Finished building StatsCollector!");
+ LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+ try {
+ // Initialize the index creation using the per-column statistics
information
+ // TODO: _indexCreationInfoMap holds the reference to all unique values
on heap (ColumnIndexCreationInfo ->
+ // ColumnStatistics) throughout the segment creation. Find a way
to release the memory early.
+ _indexCreator.init(_config, _segmentIndexCreationInfo,
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+ // Build the indexes
+ LOGGER.info("Start building Index by column");
+
+ TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+ // TODO: Eventually pull the doc Id sorting logic out of Record Reader
so that all row oriented logic can be
+ // removed from this code.
+ int[] sortedDocIds = ((PinotSegmentRecordReader)
_recordReader).getSortedDocIds();
+ for (String col : columns) {
+ _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+ }
+ } catch (Exception e) {
+ _indexCreator.close();
+ throw e;
+ } finally {
+ // The record reader is created by the `init` method and needs to be
closed and
+ // cleaned up even by the Column Mode builder.
+ _recordReader.close();
+ }
+
+ // TODO: Using column oriented, we can't catch incomplete records. Does
that matter?
+
+ LOGGER.info("Finished records indexing by column in IndexCreator!");
+
+ handlePostCreation();
+ }
+
private void handlePostCreation()
throws Exception {
ColumnStatistics timeColumnStatistics =
_segmentStats.getColumnProfileFor(_config.getTimeColumnName());
@@ -285,8 +335,8 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
// When totalDoc is 0, check whether 'failOnEmptySegment' option is
true. If so, directly fail the segment
// creation.
Preconditions.checkArgument(!_config.isFailOnEmptySegment(),
- "Failing the empty segment creation as the option
'failOnEmptySegment' is set to: " + _config
- .isFailOnEmptySegment());
+ "Failing the empty segment creation as the option
'failOnEmptySegment' is set to: "
+ + _config.isFailOnEmptySegment());
// Generate a unique name for a segment with no rows
long now = System.currentTimeMillis();
_segmentName =
_config.getSegmentNameGenerator().generateSegmentName(sequenceId, now, now);
@@ -344,12 +394,13 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
// Persist creation metadata to disk
persistCreationMeta(segmentOutputDir, crc, creationTime);
- LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+ LOGGER.info("Driver, record read time : {}",
TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs));
LOGGER.info("Driver, stats collector time : {}", _totalStatsCollectorTime);
LOGGER.info("Driver, indexing time : {}", _totalIndexTime);
}
- private void updatePostSegmentCreationIndexes(File indexDir) throws
Exception {
+ private void updatePostSegmentCreationIndexes(File indexDir)
+ throws Exception {
Set<IndexType> postSegCreationIndexes =
IndexService.getInstance().getAllIndexes().stream()
.filter(indexType -> indexType.getIndexBuildLifecycle() ==
IndexType.BuildLifecycle.POST_SEGMENT_CREATION)
.collect(Collectors.toSet());
@@ -459,11 +510,11 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
if (storedType == DataType.BYTES) {
defaultNullValue = new ByteArray((byte[]) defaultNullValue);
}
- boolean createDictionary = !rawIndexCreationColumns.contains(columnName)
- && !rawIndexCompressionTypeKeys.contains(columnName);
+ boolean createDictionary =
+ !rawIndexCreationColumns.contains(columnName) &&
!rawIndexCompressionTypeKeys.contains(columnName);
_indexCreationInfoMap.put(columnName,
- new ColumnIndexCreationInfo(columnProfile, createDictionary,
useVarLengthDictionary,
- false/*isAutoGenerated*/, defaultNullValue));
+ new ColumnIndexCreationInfo(columnProfile, createDictionary,
useVarLengthDictionary, false/*isAutoGenerated*/,
+ defaultNullValue));
}
_segmentIndexCreationInfo.setTotalDocs(_totalDocs);
}
@@ -477,8 +528,8 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
@Deprecated
public static boolean shouldUseVarLengthDictionary(String columnName,
Set<String> varLengthDictColumns,
DataType columnStoredType, ColumnStatistics columnProfile) {
- return DictionaryIndexType.shouldUseVarLengthDictionary(
- columnName, varLengthDictColumns, columnStoredType, columnProfile);
+ return DictionaryIndexType.shouldUseVarLengthDictionary(columnName,
varLengthDictColumns, columnStoredType,
+ columnProfile);
}
/**
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
index bc9420658f..ecc3f68ab3 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
@@ -23,7 +23,9 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.TreeMap;
+import javax.annotation.Nullable;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -55,6 +57,16 @@ public interface SegmentCreator extends Closeable,
Serializable {
void indexRow(GenericRow row)
throws IOException;
+ /**
+ * Adds a column to the index.
+ *
+ * @param columnName - The name of the column being added to.
+ * @param sortedDocIds - If not null, then this provides the sorted order of
documents.
+ * @param segment - Used to get the values of the column.
+ */
+ void indexColumn(String columnName, @Nullable int[] sortedDocIds,
IndexSegment segment)
+ throws IOException;
+
/**
* Sets the name of the segment.
*
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index a5a10b8ae5..963af07a5c 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -66,8 +66,6 @@ import
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -78,9 +76,7 @@ public class SegmentGeneratorConfig implements Serializable {
EPOCH, SIMPLE_DATE
}
- private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGeneratorConfig.class);
public static final String GENERATE_INV_BEFORE_PUSH_DEPREC_PROP =
"generate.inverted.index.before.push";
-
private final TableConfig _tableConfig;
// NOTE: Use TreeMap to guarantee the order. The custom properties will be
written into the segment metadata.
private final TreeMap<String, String> _customProperties = new TreeMap<>();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 75599cb2e6..9b7930cd74 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -61,6 +61,7 @@ public class IndexingConfig extends BaseJsonConfig {
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _aggregateMetrics;
private boolean _nullHandlingEnabled;
+ private boolean _columnMajorSegmentBuilderEnabled = false;
/**
* If `optimizeDictionary` enabled, dictionary is not created for the
high-cardinality
@@ -316,6 +317,14 @@ public class IndexingConfig extends BaseJsonConfig {
_nullHandlingEnabled = nullHandlingEnabled;
}
+ public boolean isColumnMajorSegmentBuilderEnabled() {
+ return _columnMajorSegmentBuilderEnabled;
+ }
+
+ public void setColumnMajorSegmentBuilderEnabled(boolean
columnMajorSegmentBuilderEnabled) {
+ _columnMajorSegmentBuilderEnabled = columnMajorSegmentBuilderEnabled;
+ }
+
public boolean isOptimizeDictionary() {
return _optimizeDictionary;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 0f4caa9b24..365911ee69 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("All configs for the streams from which to ingest")
private final List<Map<String, String>> _streamConfigMaps;
+ @JsonPropertyDescription("Whether to use column major mode when creating the
segment.")
+ private boolean _columnMajorSegmentBuilderEnabled;
+
@JsonCreator
public StreamIngestionConfig(@JsonProperty("streamConfigMaps")
List<Map<String, String>> streamConfigMaps) {
_streamConfigMaps = streamConfigMaps;
@@ -42,4 +45,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
public List<Map<String, String>> getStreamConfigMaps() {
return _streamConfigMaps;
}
+
+ public void setColumnMajorSegmentBuilderEnabled(boolean
enableColumnMajorSegmentCreation) {
+ _columnMajorSegmentBuilderEnabled = enableColumnMajorSegmentCreation;
+ }
+
+ public boolean getColumnMajorSegmentBuilderEnabled() {
+ return _columnMajorSegmentBuilderEnabled;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 64b8014b81..a65b4fc169 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -98,6 +98,7 @@ public class TableConfigBuilder {
private Map<String, String> _streamConfigs;
private SegmentPartitionConfig _segmentPartitionConfig;
private boolean _nullHandlingEnabled;
+ private boolean _columnMajorSegmentBuilderEnabled;
private List<String> _varLengthDictionaryColumns;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private List<String> _jsonIndexColumns;
@@ -328,6 +329,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setColumnMajorSegmentBuilderEnabled(boolean
columnMajorSegmentBuilderEnabled) {
+ _columnMajorSegmentBuilderEnabled = columnMajorSegmentBuilderEnabled;
+ return this;
+ }
+
public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
_customConfig = customConfig;
return this;
@@ -451,6 +457,7 @@ public class TableConfigBuilder {
indexingConfig.setStreamConfigs(_streamConfigs);
indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
+
indexingConfig.setColumnMajorSegmentBuilderEnabled(_columnMajorSegmentBuilderEnabled);
indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
indexingConfig.setJsonIndexColumns(_jsonIndexColumns);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]