This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 888c961  Add segmentNameGeneratorType config to IndexingConfig (#7346)
888c961 is described below

commit 888c9613ffc434b583918b207945268853fc8621
Author: Seunghyun Lee <[email protected]>
AuthorDate: Fri Aug 20 17:59:15 2021 -0700

    Add segmentNameGeneratorType config to IndexingConfig (#7346)
    
    - Add segmentNameGeneratorType to IndexingConfing
    - Add SegmentNameGeneratorFactory
    - Add the test
    
    TODO: in the long term, we should have a dedicated
    segment generator related config in the table config.
---
 .../framework/SegmentProcessorFramework.java       | 19 ++++--
 .../framework/SegmentProcessorFrameworkTest.java   | 25 ++++++++
 .../creator/name/SegmentNameGeneratorFactory.java  | 72 ++++++++++++++++++++++
 .../pinot/spi/config/table/IndexingConfig.java     | 15 ++++-
 .../pinot/spi/config/table/IndexingConfigTest.java |  2 +
 5 files changed, 127 insertions(+), 6 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 241ead6..2bf0714 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -36,6 +36,9 @@ import 
org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import 
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,11 +113,19 @@ public class SegmentProcessorFramework {
     // Segment creation phase
     LOGGER.info("Beginning segment creation phase on partitions: {}", 
partitionToFileManagerMap.keySet());
     List<File> outputSegmentDirs = new ArrayList<>();
-    SegmentGeneratorConfig generatorConfig =
-        new SegmentGeneratorConfig(_segmentProcessorConfig.getTableConfig(), 
_segmentProcessorConfig.getSchema());
+    TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
+    Schema schema = _segmentProcessorConfig.getSchema();
+    String segmentNamePrefix = 
_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
+    SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     generatorConfig.setOutDir(_segmentsOutputDir.getPath());
-    // TODO: Use NormalizedDateSegmentNameGenerator
-    
generatorConfig.setSegmentNamePrefix(_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix());
+
+    if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) 
{
+      generatorConfig.setSegmentNameGenerator(
+          SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, 
schema, segmentNamePrefix, null, false));
+    } else {
+      generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
+    }
+
     int maxNumRecordsPerSegment = 
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
     CompositeTransformer passThroughTransformer = 
CompositeTransformer.getPassThroughTransformer();
     int sequenceId = 0;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index d1ab033..5aacc24 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -68,6 +68,8 @@ public class SegmentProcessorFrameworkTest {
 
   private TableConfig _tableConfig;
   private TableConfig _tableConfigNullValueEnabled;
+  private TableConfig _tableConfigSegmentNameGeneratorEnabled;
+
   private Schema _schema;
   private Schema _schemaMV;
 
@@ -93,6 +95,10 @@ public class SegmentProcessorFrameworkTest {
     _tableConfigNullValueEnabled =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time")
             .setNullHandlingEnabled(true).build();
+    _tableConfigSegmentNameGeneratorEnabled =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
+    
_tableConfigSegmentNameGeneratorEnabled.getIndexingConfig().setSegmentNameGeneratorType("normalizedDate");
+
     _schema =
         new 
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign",
 DataType.STRING, "")
             // NOTE: Intentionally put 1000 as default value to test skipping 
null values during rollup
@@ -423,6 +429,25 @@ public class SegmentProcessorFrameworkTest {
     assertEquals(segmentMetadata.getName(), 
"myPrefix_1597881600000_1597892400000_2");
     FileUtils.cleanDirectory(workingDir);
     rewindRecordReaders(_singleSegment);
+
+    config = new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfigSegmentNameGeneratorEnabled)
+        .setSchema(_schema).setSegmentConfig(new 
SegmentConfig.Builder().setMaxNumRecordsPerSegment(4)
+            .setSegmentNamePrefix("myPrefix").build()).build();
+    framework = new SegmentProcessorFramework(_singleSegment, config, 
workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 3);
+    outputSegments.sort(null);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+    assertEquals(segmentMetadata.getTotalDocs(), 4);
+    assertEquals(segmentMetadata.getName(), 
"myPrefix_2020-08-18_2020-08-19_0");
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
+    assertEquals(segmentMetadata.getTotalDocs(), 4);
+    assertEquals(segmentMetadata.getName(), 
"myPrefix_2020-08-19_2020-08-19_1");
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
+    assertEquals(segmentMetadata.getTotalDocs(), 2);
+    assertEquals(segmentMetadata.getName(), 
"myPrefix_2020-08-20_2020-08-20_2");
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
   }
 
   @Test
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
new file mode 100644
index 0000000..85a8d14
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class SegmentNameGeneratorFactory {
+  public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+  public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = 
"normalizeddate";
+
+  private SegmentNameGeneratorFactory() {
+  }
+
+  /**
+   * Create the segment name generator given input configurations
+   */
+  public static SegmentNameGenerator createSegmentNameGenerator(TableConfig 
tableConfig, Schema schema,
+      @Nullable String prefix, @Nullable String postfix, boolean 
excludeSequenceId) {
+    String segmentNameGeneratorType = 
tableConfig.getIndexingConfig().getSegmentNameGeneratorType();
+    if (segmentNameGeneratorType == null || 
segmentNameGeneratorType.isEmpty()) {
+      segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
+    }
+
+    String tableName = tableConfig.getTableName();
+    switch (segmentNameGeneratorType.toLowerCase()) {
+      case SIMPLE_SEGMENT_NAME_GENERATOR:
+        if (prefix != null) {
+          return new SimpleSegmentNameGenerator(prefix, postfix);
+        }
+        return new SimpleSegmentNameGenerator(tableName, postfix);
+      case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+        DateTimeFormatSpec dateTimeFormatSpec = null;
+        String timeColumnName = validationConfig.getTimeColumnName();
+        if (timeColumnName != null) {
+          DateTimeFieldSpec dateTimeFieldSpec = 
schema.getSpecForTimeColumn(timeColumnName);
+          Preconditions.checkNotNull(dateTimeFieldSpec,
+              "Schema does not contain the time column specified in the table 
config.");
+          dateTimeFormatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+        }
+        return new NormalizedDateSegmentNameGenerator(tableName, prefix, 
excludeSequenceId,
+            IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
+            
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig), 
dateTimeFormatSpec);
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name 
generator type: " + segmentNameGeneratorType);
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index c0f941e..992f085 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -30,8 +30,6 @@ public class IndexingConfig extends BaseJsonConfig {
   private List<String> _rangeIndexColumns;
   private List<String> _jsonIndexColumns;
   private List<String> _h3IndexColumns;
-  private boolean _autoGeneratedInvertedIndex;
-  private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
   private List<String> _bloomFilterColumns;
   private Map<String, BloomFilterConfig> _bloomFilterConfigs;
@@ -51,6 +49,11 @@ public class IndexingConfig extends BaseJsonConfig {
   private boolean _aggregateMetrics;
   private boolean _nullHandlingEnabled;
 
+  // TODO: Add a new configuration related to the segment generation
+  private boolean _autoGeneratedInvertedIndex;
+  private boolean _createInvertedIndexDuringSegmentGeneration;
+  private String _segmentNameGeneratorType;
+
   /**
    * The list of columns for which the variable length dictionary needs to be 
enabled in offline
    * segments. This is only valid for string and bytes columns and has no 
impact for columns of
@@ -250,4 +253,12 @@ public class IndexingConfig extends BaseJsonConfig {
   public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
+
+  public String getSegmentNameGeneratorType() {
+    return _segmentNameGeneratorType;
+  }
+
+  public void setSegmentNameGeneratorType(String segmentNameGeneratorType) {
+    _segmentNameGeneratorType = segmentNameGeneratorType;
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
index 6c7058b..ff1905a 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
@@ -52,6 +52,7 @@ public class IndexingConfigTest {
     indexingConfig.setNoDictionaryConfig(noDictionaryConfig);
     List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z");
     indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
+    indexingConfig.setSegmentNameGeneratorType("normalizedDate");
 
     indexingConfig = 
JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig), 
IndexingConfig.class);
 
@@ -63,6 +64,7 @@ public class IndexingConfigTest {
     assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
     assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
     assertEquals(indexingConfig.getVarLengthDictionaryColumns(), 
varLengthDictionaryColumns);
+    assertEquals(indexingConfig.getSegmentNameGeneratorType(), 
"normalizedDate");
   }
 
   @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to