This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 88edfd49cc Update Segment builder to use column major tables (#11776)
88edfd49cc is described below

commit 88edfd49cc27042ae923e053cd47f1bec7fb5663
Author: Erich <[email protected]>
AuthorDate: Sun Oct 22 13:05:59 2023 -0400

    Update Segment builder to use column major tables (#11776)
---
 .../realtime/RealtimeSegmentDataManager.java       |   6 +-
 .../converter/RealtimeSegmentConverterTest.java    | 319 ++++++++++++++++++++-
 .../converter/RealtimeSegmentConverter.java        |  31 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  53 ++++
 .../impl/SegmentIndexCreationDriverImpl.java       |  95 ++++--
 .../pinot/segment/spi/creator/SegmentCreator.java  |  12 +
 .../spi/creator/SegmentGeneratorConfig.java        |   4 -
 .../pinot/spi/config/table/IndexingConfig.java     |   9 +
 .../table/ingestion/StreamIngestionConfig.java     |  11 +
 .../spi/utils/builder/TableConfigBuilder.java      |   7 +
 10 files changed, 507 insertions(+), 40 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e772eae7e8..f3e8fc588e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -934,7 +934,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
       segmentZKPropsConfig.setStartOffset(_segmentZKMetadata.getStartOffset());
       segmentZKPropsConfig.setEndOffset(_currentOffset.toString());
-      // lets convert the segment now
+      // let's convert the segment now
       RealtimeSegmentConverter converter =
           new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig, 
tempSegmentFolder.getAbsolutePath(),
               _schema, _tableNameWithType, _tableConfig, 
_segmentZKMetadata.getSegmentName(),
@@ -951,8 +951,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       }
       final long buildTimeMillis = now() - lockAcquireTimeMillis;
       final long waitTimeMillis = lockAcquireTimeMillis - startTimeMillis;
-      _segmentLogger
-          .info("Successfully built segment in {} ms, after lockWaitTime {} 
ms", buildTimeMillis, waitTimeMillis);
+      _segmentLogger.info("Successfully built segment (Column Mode: {}) in {} 
ms, after lockWaitTime {} ms",
+          converter.isColumnMajorEnabled(), buildTimeMillis, waitTimeMillis);
 
       File dataDir = new File(_resourceDataDir);
       File indexDir = new File(dataDir, _segmentNameStr);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index 28df031be2..50f1396c10 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -22,20 +22,33 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import 
org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
 import 
org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
+import 
org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
 import 
org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -44,6 +57,8 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
@@ -84,7 +99,7 @@ public class RealtimeSegmentConverterTest {
   }
 
   @Test
-  public void testNoRecordsIndexed()
+  public void testNoRecordsIndexedRowMajorSegmentBuilder()
       throws Exception {
     File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
     TableConfig tableConfig =
@@ -93,7 +108,8 @@ public class RealtimeSegmentConverterTest {
             .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
             .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
             .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
-            
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+            
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).setColumnMajorSegmentBuilderEnabled(false)
+            .build();
     Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
@@ -130,8 +146,8 @@ public class RealtimeSegmentConverterTest {
     segmentZKPropsConfig.setStartOffset("1");
     segmentZKPropsConfig.setEndOffset("100");
     ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
-        indexingConfig.getInvertedIndexColumns(), null, null,
-        indexingConfig.getNoDictionaryColumns(), 
indexingConfig.getVarLengthDictionaryColumns());
+        indexingConfig.getInvertedIndexColumns(), null, null, 
indexingConfig.getNoDictionaryColumns(),
+        indexingConfig.getVarLengthDictionaryColumns());
     RealtimeSegmentConverter converter =
         new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
             tableNameWithType, tableConfig, segmentName, cdc, false);
@@ -148,6 +164,301 @@ public class RealtimeSegmentConverterTest {
     assertEquals(segmentMetadata.getEndOffset(), "100");
   }
 
+  @Test
+  public void test10RecordsIndexedRowMajorSegmentBuilder()
+      throws Exception {
+    File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+            .setTimeColumnName(DATE_TIME_COLUMN)
+            .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, 
LONG_COLUMN1))
+            .setSortedColumn(LONG_COLUMN1)
+            .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+            .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+            .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+            .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3))
+            .setColumnMajorSegmentBuilderEnabled(false)
+            .build();
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+        .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+        .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+
+    String tableNameWithType = tableConfig.getTableName();
+    String segmentName = "testTable__0__0__123456";
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+    DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, 
true);
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+            
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+            .setAvgNumMultiValues(3)
+            .setIndex(Sets.newHashSet(LONG_COLUMN2), 
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+            .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)), 
StandardIndexes.dictionary(), varLengthDictConf)
+            .setIndex(Sets.newHashSet(STRING_COLUMN1, LONG_COLUMN1), 
StandardIndexes.inverted(), IndexConfig.ENABLED)
+            
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+            .setMemoryManager(new DirectMemoryManager(segmentName))
+            .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new 
File(tmpDir, "stats")))
+            .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+    // create mutable segment impl
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+    List<GenericRow> rows = generateTestData();
+
+    for (GenericRow row : rows) {
+      mutableSegmentImpl.index(row, null);
+    }
+
+    File outputDir = new File(tmpDir, "outputDir");
+    SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+    segmentZKPropsConfig.setStartOffset("1");
+    segmentZKPropsConfig.setEndOffset("100");
+    ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+        indexingConfig.getInvertedIndexColumns(), null, null, 
indexingConfig.getNoDictionaryColumns(),
+        indexingConfig.getVarLengthDictionaryColumns());
+    RealtimeSegmentConverter converter =
+        new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
+            tableNameWithType, tableConfig, segmentName, cdc, false);
+    converter.build(SegmentVersion.v3, null);
+
+    File indexDir = new File(outputDir, segmentName);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+    assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+    assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+    long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+    long expectedEndTime = (long) rows.get(rows.size() - 
1).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+    
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+    assertEquals(segmentMetadata.getStartOffset(), "1");
+    assertEquals(segmentMetadata.getEndOffset(), "100");
+
+    testSegment(rows, indexDir, tableConfig, segmentMetadata);
+  }
+
+  @Test
+  public void testNoRecordsIndexedColumnMajorSegmentBuilder()
+      throws Exception {
+    File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+            
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+            .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+            .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+            .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+            
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).setColumnMajorSegmentBuilderEnabled(true)
+            .build();
+    Schema schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1, 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+        .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+        .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+    String tableNameWithType = tableConfig.getTableName();
+    String segmentName = "testTable__0__0__123456";
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+    DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, 
true);
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+            
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+            .setAvgNumMultiValues(3)
+            .setIndex(Sets.newHashSet(LONG_COLUMN2), 
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+            .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)), 
StandardIndexes.dictionary(), varLengthDictConf)
+            .setIndex(Sets.newHashSet(STRING_COLUMN1), 
StandardIndexes.inverted(), IndexConfig.ENABLED)
+            
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+            .setMemoryManager(new DirectMemoryManager(segmentName))
+            .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new 
File(tmpDir, "stats")))
+            .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+    // create mutable segment impl
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+
+    File outputDir = new File(tmpDir, "outputDir");
+    SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+    segmentZKPropsConfig.setStartOffset("1");
+    segmentZKPropsConfig.setEndOffset("100");
+    ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+        indexingConfig.getInvertedIndexColumns(), null, null, 
indexingConfig.getNoDictionaryColumns(),
+        indexingConfig.getVarLengthDictionaryColumns());
+    RealtimeSegmentConverter converter =
+        new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
+            tableNameWithType, tableConfig, segmentName, cdc, false);
+    converter.build(SegmentVersion.v3, null);
+
+    File indexDir = new File(outputDir, segmentName);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    assertEquals(segmentMetadata.getTotalDocs(), 0);
+    assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+    assertEquals(segmentMetadata.getStartTime(), segmentMetadata.getEndTime());
+    
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+    assertEquals(segmentMetadata.getStartOffset(), "1");
+    assertEquals(segmentMetadata.getEndOffset(), "100");
+  }
+
+  @Test
+  public void test10RecordsIndexedColumnMajorSegmentBuilder()
+      throws Exception {
+    File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable")
+            .setTimeColumnName(DATE_TIME_COLUMN)
+            .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1, 
LONG_COLUMN1))
+            .setSortedColumn(LONG_COLUMN1)
+            .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+            .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
+            .setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+            .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3))
+            .setColumnMajorSegmentBuilderEnabled(true)
+            .build();
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+        .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+        .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+
+    String tableNameWithType = tableConfig.getTableName();
+    String segmentName = "testTable__0__0__123456";
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+    DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, 
true);
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new 
RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+            
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
+            .setAvgNumMultiValues(3)
+            .setIndex(Sets.newHashSet(LONG_COLUMN2), 
StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
+            .setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)), 
StandardIndexes.dictionary(), varLengthDictConf)
+            .setIndex(Sets.newHashSet(STRING_COLUMN1, LONG_COLUMN1), 
StandardIndexes.inverted(), IndexConfig.ENABLED)
+            
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
+            .setMemoryManager(new DirectMemoryManager(segmentName))
+            .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new 
File(tmpDir, "stats")))
+            .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+    // create mutable segment impl
+    MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+    List<GenericRow> rows = generateTestData();
+
+    for (GenericRow row : rows) {
+      mutableSegmentImpl.index(row, null);
+    }
+
+    File outputDir = new File(tmpDir, "outputDir");
+    SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
+    segmentZKPropsConfig.setStartOffset("1");
+    segmentZKPropsConfig.setEndOffset("100");
+    ColumnIndicesForRealtimeTable cdc = new 
ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
+        indexingConfig.getInvertedIndexColumns(), null, null, 
indexingConfig.getNoDictionaryColumns(),
+        indexingConfig.getVarLengthDictionaryColumns());
+    RealtimeSegmentConverter converter =
+        new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, 
outputDir.getAbsolutePath(), schema,
+            tableNameWithType, tableConfig, segmentName, cdc, false);
+    converter.build(SegmentVersion.v3, null);
+
+    File indexDir = new File(outputDir, segmentName);
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+    assertEquals(segmentMetadata.getTotalDocs(), rows.size());
+    assertEquals(segmentMetadata.getTimeColumn(), DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+
+    long expectedStartTime = (long) rows.get(0).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getStartTime(), expectedStartTime);
+    long expectedEndTime = (long) rows.get(rows.size() - 
1).getValue(DATE_TIME_COLUMN);
+    assertEquals(segmentMetadata.getEndTime(), expectedEndTime);
+
+    
assertTrue(segmentMetadata.getAllColumns().containsAll(schema.getColumnNames()));
+    assertEquals(segmentMetadata.getStartOffset(), "1");
+    assertEquals(segmentMetadata.getEndOffset(), "100");
+
+    testSegment(rows, indexDir, tableConfig, segmentMetadata);
+  }
+
+  private void testSegment(List<GenericRow> rows, File indexDir,
+      TableConfig tableConfig, SegmentMetadataImpl segmentMetadata)
+      throws IOException {
+    SegmentLocalFSDirectory segmentDir = new SegmentLocalFSDirectory(indexDir, 
segmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Reader segmentReader = segmentDir.createReader();
+
+    Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
+    Map<String, ColumnMetadata> columnMetadataMap = 
segmentMetadata.getColumnMetadataMap();
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
tableConfig);
+    for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
+      indexContainerMap.put(entry.getKey(),
+          new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), 
indexLoadingConfig));
+    }
+    ImmutableSegmentImpl segmentFile = new ImmutableSegmentImpl(segmentDir, 
segmentMetadata, indexContainerMap, null);
+
+    GenericRow readRow = new GenericRow();
+    int docId = 0;
+    for (GenericRow row : rows) {
+      segmentFile.getRecord(docId, readRow);
+      assertEquals(readRow.getValue(STRING_COLUMN1), 
row.getValue(STRING_COLUMN1));
+      assertEquals(readRow.getValue(STRING_COLUMN2), 
row.getValue(STRING_COLUMN2));
+      assertEquals(readRow.getValue(STRING_COLUMN3), 
row.getValue(STRING_COLUMN3));
+      assertEquals(readRow.getValue(STRING_COLUMN4), 
row.getValue(STRING_COLUMN4));
+      assertEquals(readRow.getValue(LONG_COLUMN1), row.getValue(LONG_COLUMN1));
+      assertEquals(readRow.getValue(LONG_COLUMN2), row.getValue(LONG_COLUMN2));
+      assertEquals(readRow.getValue(LONG_COLUMN3), row.getValue(LONG_COLUMN3));
+      assertEquals(readRow.getValue(LONG_COLUMN4), row.getValue(LONG_COLUMN4));
+      assertEquals(readRow.getValue(MV_INT_COLUMN), 
row.getValue(MV_INT_COLUMN));
+      assertEquals(readRow.getValue(DATE_TIME_COLUMN), 
row.getValue(DATE_TIME_COLUMN));
+
+      docId += 1;
+    }
+  }
+
+  private List<GenericRow> generateTestData() {
+    LinkedList<GenericRow> rows = new LinkedList<>();
+
+    for (int i = 0; i < 10; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(STRING_COLUMN1, "Hello" + i);
+      row.putValue(STRING_COLUMN2, "World" + i);
+      row.putValue(STRING_COLUMN3, "F1" + i);
+      row.putValue(STRING_COLUMN4, "F2" + i);
+      row.putValue(LONG_COLUMN1, 67L + i);
+      row.putValue(LONG_COLUMN2, 66L + i);
+      row.putValue(LONG_COLUMN3, 65L + i);
+      row.putValue(LONG_COLUMN4, 64L + i);
+      List<Integer> intList = new ArrayList<>();
+      intList.add(100 + i);
+      intList.add(200 + i);
+      row.putValue(MV_INT_COLUMN, intList.toArray());
+      row.putValue(DATE_TIME_COLUMN, 1697814309L + i);
+      rows.add(row);
+    }
+
+    return rows;
+  }
+
   private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
     SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
     segmentZKMetadata.setCreationTime(System.currentTimeMillis());
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index c0a29afcf0..a6e2d067b6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -54,6 +54,7 @@ public class RealtimeSegmentConverter {
   private final String _segmentName;
   private final ColumnIndicesForRealtimeTable _columnIndicesForRealtimeTable;
   private final boolean _nullHandlingEnabled;
+  private final boolean _enableColumnMajor;
 
   public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, 
SegmentZKPropsConfig segmentZKPropsConfig,
       String outputPath, Schema schema, String tableName, TableConfig 
tableConfig, String segmentName,
@@ -70,11 +71,19 @@ public class RealtimeSegmentConverter {
     _tableConfig = tableConfig;
     _segmentName = segmentName;
     _nullHandlingEnabled = nullHandlingEnabled;
+    if (_tableConfig.getIngestionConfig() != null
+        && _tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      _enableColumnMajor = _tableConfig.getIngestionConfig()
+          .getStreamIngestionConfig().getColumnMajorSegmentBuilderEnabled();
+    } else {
+      _enableColumnMajor = 
_tableConfig.getIndexingConfig().isColumnMajorSegmentBuilderEnabled();
+    }
   }
 
   public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics 
serverMetrics)
       throws Exception {
     SegmentGeneratorConfig genConfig = new 
SegmentGeneratorConfig(_tableConfig, _dataSchema);
+
     // The segment generation code in SegmentColumnarIndexCreator will throw
     // exception if start and end time in time column are not in acceptable
     // range. We don't want the realtime consumption to stop (if an exception
@@ -110,15 +119,19 @@ public class RealtimeSegmentConverter {
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
     try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader()) {
-      int[] sortedDocIds =
-          _columnIndicesForRealtimeTable.getSortedColumn() != null
-              ? 
_realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(
-                  _columnIndicesForRealtimeTable.getSortedColumn()) : null;
+      int[] sortedDocIds = _columnIndicesForRealtimeTable.getSortedColumn() != 
null
+          ? _realtimeSegmentImpl.getSortedDocIdIterationOrderWithSortedColumn(
+          _columnIndicesForRealtimeTable.getSortedColumn()) : null;
       recordReader.init(_realtimeSegmentImpl, sortedDocIds);
       RealtimeSegmentSegmentCreationDataSource dataSource =
           new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, 
recordReader);
       driver.init(genConfig, dataSource, 
TransformPipeline.getPassThroughPipeline());
-      driver.build();
+
+      if (!_enableColumnMajor) {
+        driver.build();
+      } else {
+        driver.buildByColumn(_realtimeSegmentImpl);
+      }
     }
 
     if (segmentPartitionConfig != null) {
@@ -130,8 +143,8 @@ public class RealtimeSegmentConverter {
     }
   }
 
-  private <C extends IndexConfig> void 
addIndexOrDefault(SegmentGeneratorConfig genConfig,
-      IndexType<C, ?, ?> indexType, @Nullable Collection<String> columns, C 
defaultConfig) {
+  private <C extends IndexConfig> void 
addIndexOrDefault(SegmentGeneratorConfig genConfig, IndexType<C, ?, ?> 
indexType,
+      @Nullable Collection<String> columns, C defaultConfig) {
     Map<String, C> config = indexType.getConfig(genConfig.getTableConfig(), 
genConfig.getSchema());
     if (columns != null) {
       for (String column : columns) {
@@ -154,4 +167,8 @@ public class RealtimeSegmentConverter {
     }
     return newSchema;
   }
+
+  public boolean isColumnMajorEnabled() {
+    return _enableColumnMajor;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index a84c275299..10e0a5c1d0 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -40,6 +40,8 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVe
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexPlugin;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
@@ -336,6 +338,57 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     _docIdCounter++;
   }
 
+  @Override
+  public void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment)
+      throws IOException {
+    // Iterate over each value in the column
+    int numDocs = segment.getSegmentMetadata().getTotalDocs();
+    if (numDocs == 0) {
+      return;
+    }
+
+    try (PinotSegmentColumnReader colReader = new 
PinotSegmentColumnReader(segment, columnName)) {
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
_creatorsByColAndIndex.get(columnName);
+      NullValueVectorCreator nullVec = 
_nullValueVectorCreatorMap.get(columnName);
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
+      if (sortedDocIds != null) {
+        int onDiskDocId = 0;
+        for (int docId : sortedDocIds) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, onDiskDocId,
+              nullVec);
+          onDiskDocId++;
+        }
+      } else {
+        for (int docId = 0; docId < numDocs; docId++) {
+          indexColumnValue(colReader, creatorsByIndex, columnName, fieldSpec, 
dictionaryCreator, docId, docId, nullVec);
+        }
+      }
+    }
+  }
+
+  private void indexColumnValue(PinotSegmentColumnReader colReader,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, String 
columnName, FieldSpec fieldSpec,
+      SegmentDictionaryCreator dictionaryCreator, int sourceDocId, int 
onDiskDocPos, NullValueVectorCreator nullVec)
+      throws IOException {
+    Object columnValueToIndex = colReader.getValue(sourceDocId);
+    if (columnValueToIndex == null) {
+      throw new RuntimeException("Null value for column:" + columnName);
+    }
+
+    if (fieldSpec.isSingleValueField()) {
+      indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
+    } else {
+      indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
+    }
+
+    if (_nullHandlingEnabled) {
+      if (colReader.isNull(sourceDocId)) {
+        nullVec.setNull(onDiskDocPos);
+      }
+    }
+  }
+
   private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, 
Object value,
       Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
       throws IOException {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 0293e644c4..129bc35ccf 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -29,7 +29,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
@@ -45,6 +47,7 @@ import 
org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
 import org.apache.pinot.segment.local.utils.CrcUtils;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
 import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
@@ -102,7 +105,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   private int _totalDocs = 0;
   private File _tempIndexDir;
   private String _segmentName;
-  private long _totalRecordReadTime = 0;
+  private long _totalRecordReadTimeNs = 0;
   private long _totalIndexTime = 0;
   private long _totalStatsCollectorTime = 0;
   private boolean _continueOnError;
@@ -122,8 +125,8 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     TableConfig tableConfig = segmentGeneratorConfig.getTableConfig();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
     String recordReaderClassName = 
segmentGeneratorConfig.getRecordReaderPath();
-    Set<String> sourceFields = IngestionUtils
-        .getFieldsForRecordExtractor(tableConfig.getIngestionConfig(), 
segmentGeneratorConfig.getSchema());
+    Set<String> sourceFields = 
IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(),
+        segmentGeneratorConfig.getSchema());
 
     // Allow for instantiation general record readers from a record reader 
path passed into segment generator config
     // If this is set, this will override the file format
@@ -142,8 +145,8 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     if (fileFormat == FileFormat.PINOT) {
       return new PinotSegmentRecordReader(dataFile, schema, 
segmentGeneratorConfig.getColumnSortOrder());
     } else {
-      return RecordReaderFactory
-          .getRecordReader(fileFormat, dataFile, sourceFields, 
segmentGeneratorConfig.getReaderConfig());
+      return RecordReaderFactory.getRecordReader(fileFormat, dataFile, 
sourceFields,
+          segmentGeneratorConfig.getReaderConfig());
     }
   }
 
@@ -199,8 +202,9 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       indexDir.mkdirs();
     }
 
-    _ingestionSchemaValidator = SchemaValidatorFactory
-        .getSchemaValidator(_dataSchema, _recordReader.getClass().getName(), 
config.getInputFilePath());
+    _ingestionSchemaValidator =
+        SchemaValidatorFactory.getSchemaValidator(_dataSchema, 
_recordReader.getClass().getName(),
+            config.getInputFilePath());
 
     // Create a temporary directory used in segment creation
     _tempIndexDir = new File(indexDir, "tmp-" + UUID.randomUUID());
@@ -229,16 +233,21 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       GenericRow reuse = new GenericRow();
       TransformPipeline.Result reusedResult = new TransformPipeline.Result();
       while (_recordReader.hasNext()) {
-        long recordReadStartTime = System.currentTimeMillis();
-        long recordReadStopTime = System.currentTimeMillis();
+        long recordReadStopTime = System.nanoTime();
         long indexStopTime;
         reuse.clear();
+
         try {
+          long recordReadStartTime = System.nanoTime();
           GenericRow decodedRow = _recordReader.next(reuse);
-          recordReadStartTime = System.currentTimeMillis();
+          recordReadStartTime = System.nanoTime();
+
+          // Should not be needed anymore.
+          // Add row to indexes
           _transformPipeline.processRow(decodedRow, reusedResult);
-          recordReadStopTime = System.currentTimeMillis();
-          _totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
+
+          recordReadStopTime = System.nanoTime();
+          _totalRecordReadTimeNs += (recordReadStopTime - recordReadStartTime);
         } catch (Exception e) {
           if (!_continueOnError) {
             throw new RuntimeException("Error occurred while reading row 
during indexing", e);
@@ -273,6 +282,47 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     handlePostCreation();
   }
 
+  public void buildByColumn(IndexSegment indexSegment)
+      throws Exception {
+    // Count the number of documents and gather per-column statistics
+    LOGGER.debug("Start building StatsCollector!");
+    buildIndexCreationInfo();
+    LOGGER.info("Finished building StatsCollector!");
+    LOGGER.info("Collected stats for {} documents", _totalDocs);
+
+    try {
+      // Initialize the index creation using the per-column statistics 
information
+      // TODO: _indexCreationInfoMap holds the reference to all unique values 
on heap (ColumnIndexCreationInfo ->
+      //       ColumnStatistics) throughout the segment creation. Find a way 
to release the memory early.
+      _indexCreator.init(_config, _segmentIndexCreationInfo, 
_indexCreationInfoMap, _dataSchema, _tempIndexDir);
+
+      // Build the indexes
+      LOGGER.info("Start building Index by column");
+
+      TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();
+
+      // TODO: Eventually pull the doc Id sorting logic out of Record Reader 
so that all row oriented logic can be
+      //    removed from this code.
+      int[] sortedDocIds = ((PinotSegmentRecordReader) 
_recordReader).getSortedDocIds();
+      for (String col : columns) {
+        _indexCreator.indexColumn(col, sortedDocIds, indexSegment);
+      }
+    } catch (Exception e) {
+      _indexCreator.close();
+      throw e;
+    } finally {
+      // The record reader is created by the `init` method and needs to be 
closed and
+      // cleaned up even by the Column Mode builder.
+      _recordReader.close();
+    }
+
+    // TODO: Using column oriented, we can't catch incomplete records.  Does 
that matter?
+
+    LOGGER.info("Finished records indexing by column in IndexCreator!");
+
+    handlePostCreation();
+  }
+
   private void handlePostCreation()
       throws Exception {
     ColumnStatistics timeColumnStatistics = 
_segmentStats.getColumnProfileFor(_config.getTimeColumnName());
@@ -285,8 +335,8 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
         // When totalDoc is 0, check whether 'failOnEmptySegment' option is 
true. If so, directly fail the segment
         // creation.
         Preconditions.checkArgument(!_config.isFailOnEmptySegment(),
-            "Failing the empty segment creation as the option 
'failOnEmptySegment' is set to: " + _config
-                .isFailOnEmptySegment());
+            "Failing the empty segment creation as the option 
'failOnEmptySegment' is set to: "
+                + _config.isFailOnEmptySegment());
         // Generate a unique name for a segment with no rows
         long now = System.currentTimeMillis();
         _segmentName = 
_config.getSegmentNameGenerator().generateSegmentName(sequenceId, now, now);
@@ -344,12 +394,13 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     // Persist creation metadata to disk
     persistCreationMeta(segmentOutputDir, crc, creationTime);
 
-    LOGGER.info("Driver, record read time : {}", _totalRecordReadTime);
+    LOGGER.info("Driver, record read time : {}", 
TimeUnit.NANOSECONDS.toMillis(_totalRecordReadTimeNs));
     LOGGER.info("Driver, stats collector time : {}", _totalStatsCollectorTime);
     LOGGER.info("Driver, indexing time : {}", _totalIndexTime);
   }
 
-  private void updatePostSegmentCreationIndexes(File indexDir) throws 
Exception {
+  private void updatePostSegmentCreationIndexes(File indexDir)
+      throws Exception {
     Set<IndexType> postSegCreationIndexes = 
IndexService.getInstance().getAllIndexes().stream()
         .filter(indexType -> indexType.getIndexBuildLifecycle() == 
IndexType.BuildLifecycle.POST_SEGMENT_CREATION)
         .collect(Collectors.toSet());
@@ -459,11 +510,11 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       if (storedType == DataType.BYTES) {
         defaultNullValue = new ByteArray((byte[]) defaultNullValue);
       }
-      boolean createDictionary = !rawIndexCreationColumns.contains(columnName)
-          && !rawIndexCompressionTypeKeys.contains(columnName);
+      boolean createDictionary =
+          !rawIndexCreationColumns.contains(columnName) && 
!rawIndexCompressionTypeKeys.contains(columnName);
       _indexCreationInfoMap.put(columnName,
-          new ColumnIndexCreationInfo(columnProfile, createDictionary, 
useVarLengthDictionary,
-              false/*isAutoGenerated*/, defaultNullValue));
+          new ColumnIndexCreationInfo(columnProfile, createDictionary, 
useVarLengthDictionary, false/*isAutoGenerated*/,
+              defaultNullValue));
     }
     _segmentIndexCreationInfo.setTotalDocs(_totalDocs);
   }
@@ -477,8 +528,8 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   @Deprecated
   public static boolean shouldUseVarLengthDictionary(String columnName, 
Set<String> varLengthDictColumns,
       DataType columnStoredType, ColumnStatistics columnProfile) {
-    return DictionaryIndexType.shouldUseVarLengthDictionary(
-        columnName, varLengthDictColumns, columnStoredType, columnProfile);
+    return DictionaryIndexType.shouldUseVarLengthDictionary(columnName, 
varLengthDictColumns, columnStoredType,
+        columnProfile);
   }
 
   /**
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
index bc9420658f..ecc3f68ab3 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentCreator.java
@@ -23,7 +23,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.TreeMap;
+import javax.annotation.Nullable;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -55,6 +57,16 @@ public interface SegmentCreator extends Closeable, 
Serializable {
   void indexRow(GenericRow row)
       throws IOException;
 
+  /**
+   * Adds a column to the index.
+   *
+   * @param columnName - The name of the column being added to.
+   * @param sortedDocIds - If not null, then this provides the sorted order of 
documents.
+   * @param segment - Used to get the values of the column.
+   */
+  void indexColumn(String columnName, @Nullable int[] sortedDocIds, 
IndexSegment segment)
+      throws IOException;
+
   /**
    * Sets the name of the segment.
    *
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index a5a10b8ae5..963af07a5c 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -66,8 +66,6 @@ import 
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -78,9 +76,7 @@ public class SegmentGeneratorConfig implements Serializable {
     EPOCH, SIMPLE_DATE
   }
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGeneratorConfig.class);
   public static final String GENERATE_INV_BEFORE_PUSH_DEPREC_PROP = 
"generate.inverted.index.before.push";
-
   private final TableConfig _tableConfig;
   // NOTE: Use TreeMap to guarantee the order. The custom properties will be 
written into the segment metadata.
   private final TreeMap<String, String> _customProperties = new TreeMap<>();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 75599cb2e6..9b7930cd74 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -61,6 +61,7 @@ public class IndexingConfig extends BaseJsonConfig {
   private SegmentPartitionConfig _segmentPartitionConfig;
   private boolean _aggregateMetrics;
   private boolean _nullHandlingEnabled;
+  private boolean _columnMajorSegmentBuilderEnabled = false;
 
   /**
    * If `optimizeDictionary` enabled, dictionary is not created for the 
high-cardinality
@@ -316,6 +317,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
 
+  public boolean isColumnMajorSegmentBuilderEnabled() {
+    return _columnMajorSegmentBuilderEnabled;
+  }
+
+  public void setColumnMajorSegmentBuilderEnabled(boolean 
columnMajorSegmentBuilderEnabled) {
+    _columnMajorSegmentBuilderEnabled = columnMajorSegmentBuilderEnabled;
+  }
+
   public boolean isOptimizeDictionary() {
     return _optimizeDictionary;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 0f4caa9b24..365911ee69 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -34,6 +34,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("All configs for the streams from which to ingest")
   private final List<Map<String, String>> _streamConfigMaps;
 
+  @JsonPropertyDescription("Whether to use column major mode when creating the 
segment.")
+  private boolean _columnMajorSegmentBuilderEnabled;
+
   @JsonCreator
   public StreamIngestionConfig(@JsonProperty("streamConfigMaps") 
List<Map<String, String>> streamConfigMaps) {
     _streamConfigMaps = streamConfigMaps;
@@ -42,4 +45,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   public List<Map<String, String>> getStreamConfigMaps() {
     return _streamConfigMaps;
   }
+
+  public void setColumnMajorSegmentBuilderEnabled(boolean 
enableColumnMajorSegmentCreation) {
+    _columnMajorSegmentBuilderEnabled = enableColumnMajorSegmentCreation;
+  }
+
+  public boolean getColumnMajorSegmentBuilderEnabled() {
+    return _columnMajorSegmentBuilderEnabled;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 64b8014b81..a65b4fc169 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -98,6 +98,7 @@ public class TableConfigBuilder {
   private Map<String, String> _streamConfigs;
   private SegmentPartitionConfig _segmentPartitionConfig;
   private boolean _nullHandlingEnabled;
+  private boolean _columnMajorSegmentBuilderEnabled;
   private List<String> _varLengthDictionaryColumns;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private List<String> _jsonIndexColumns;
@@ -328,6 +329,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setColumnMajorSegmentBuilderEnabled(boolean 
columnMajorSegmentBuilderEnabled) {
+    _columnMajorSegmentBuilderEnabled = columnMajorSegmentBuilderEnabled;
+    return this;
+  }
+
   public TableConfigBuilder setCustomConfig(TableCustomConfig customConfig) {
     _customConfig = customConfig;
     return this;
@@ -451,6 +457,7 @@ public class TableConfigBuilder {
     indexingConfig.setStreamConfigs(_streamConfigs);
     indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
     indexingConfig.setNullHandlingEnabled(_nullHandlingEnabled);
+    
indexingConfig.setColumnMajorSegmentBuilderEnabled(_columnMajorSegmentBuilderEnabled);
     indexingConfig.setVarLengthDictionaryColumns(_varLengthDictionaryColumns);
     indexingConfig.setStarTreeIndexConfigs(_starTreeIndexConfigs);
     indexingConfig.setJsonIndexColumns(_jsonIndexColumns);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to