This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 888c961 Add segmentNameGeneratorType config to IndexingConfig (#7346)
888c961 is described below
commit 888c9613ffc434b583918b207945268853fc8621
Author: Seunghyun Lee <[email protected]>
AuthorDate: Fri Aug 20 17:59:15 2021 -0700
Add segmentNameGeneratorType config to IndexingConfig (#7346)
- Add segmentNameGeneratorType to IndexingConfing
- Add SegmentNameGeneratorFactory
- Add the test
TODO: in the long term, we should have a dedicated
segment generator related config in the table config.
---
.../framework/SegmentProcessorFramework.java | 19 ++++--
.../framework/SegmentProcessorFrameworkTest.java | 25 ++++++++
.../creator/name/SegmentNameGeneratorFactory.java | 72 ++++++++++++++++++++++
.../pinot/spi/config/table/IndexingConfig.java | 15 ++++-
.../pinot/spi/config/table/IndexingConfigTest.java | 2 +
5 files changed, 127 insertions(+), 6 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 241ead6..2bf0714 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -36,6 +36,9 @@ import
org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,11 +113,19 @@ public class SegmentProcessorFramework {
// Segment creation phase
LOGGER.info("Beginning segment creation phase on partitions: {}",
partitionToFileManagerMap.keySet());
List<File> outputSegmentDirs = new ArrayList<>();
- SegmentGeneratorConfig generatorConfig =
- new SegmentGeneratorConfig(_segmentProcessorConfig.getTableConfig(),
_segmentProcessorConfig.getSchema());
+ TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
+ Schema schema = _segmentProcessorConfig.getSchema();
+ String segmentNamePrefix =
_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
+ SegmentGeneratorConfig generatorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
- // TODO: Use NormalizedDateSegmentNameGenerator
-
generatorConfig.setSegmentNamePrefix(_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix());
+
+ if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null)
{
+ generatorConfig.setSegmentNameGenerator(
+ SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig,
schema, segmentNamePrefix, null, false));
+ } else {
+ generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
+ }
+
int maxNumRecordsPerSegment =
_segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
CompositeTransformer passThroughTransformer =
CompositeTransformer.getPassThroughTransformer();
int sequenceId = 0;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index d1ab033..5aacc24 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -68,6 +68,8 @@ public class SegmentProcessorFrameworkTest {
private TableConfig _tableConfig;
private TableConfig _tableConfigNullValueEnabled;
+ private TableConfig _tableConfigSegmentNameGeneratorEnabled;
+
private Schema _schema;
private Schema _schemaMV;
@@ -93,6 +95,10 @@ public class SegmentProcessorFrameworkTest {
_tableConfigNullValueEnabled =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time")
.setNullHandlingEnabled(true).build();
+ _tableConfigSegmentNameGeneratorEnabled =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
+
_tableConfigSegmentNameGeneratorEnabled.getIndexingConfig().setSegmentNameGeneratorType("normalizedDate");
+
_schema =
new
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("campaign",
DataType.STRING, "")
// NOTE: Intentionally put 1000 as default value to test skipping
null values during rollup
@@ -423,6 +429,25 @@ public class SegmentProcessorFrameworkTest {
assertEquals(segmentMetadata.getName(),
"myPrefix_1597881600000_1597892400000_2");
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);
+
+ config = new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfigSegmentNameGeneratorEnabled)
+ .setSchema(_schema).setSegmentConfig(new
SegmentConfig.Builder().setMaxNumRecordsPerSegment(4)
+ .setSegmentNamePrefix("myPrefix").build()).build();
+ framework = new SegmentProcessorFramework(_singleSegment, config,
workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 3);
+ outputSegments.sort(null);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+ assertEquals(segmentMetadata.getTotalDocs(), 4);
+ assertEquals(segmentMetadata.getName(),
"myPrefix_2020-08-18_2020-08-19_0");
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
+ assertEquals(segmentMetadata.getTotalDocs(), 4);
+ assertEquals(segmentMetadata.getName(),
"myPrefix_2020-08-19_2020-08-19_1");
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
+ assertEquals(segmentMetadata.getTotalDocs(), 2);
+ assertEquals(segmentMetadata.getName(),
"myPrefix_2020-08-20_2020-08-20_2");
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
}
@Test
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
new file mode 100644
index 0000000..85a8d14
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+public class SegmentNameGeneratorFactory {
+ public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+ public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR =
"normalizeddate";
+
+ private SegmentNameGeneratorFactory() {
+ }
+
+ /**
+ * Create the segment name generator given input configurations
+ */
+ public static SegmentNameGenerator createSegmentNameGenerator(TableConfig
tableConfig, Schema schema,
+ @Nullable String prefix, @Nullable String postfix, boolean
excludeSequenceId) {
+ String segmentNameGeneratorType =
tableConfig.getIndexingConfig().getSegmentNameGeneratorType();
+ if (segmentNameGeneratorType == null ||
segmentNameGeneratorType.isEmpty()) {
+ segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
+ }
+
+ String tableName = tableConfig.getTableName();
+ switch (segmentNameGeneratorType.toLowerCase()) {
+ case SIMPLE_SEGMENT_NAME_GENERATOR:
+ if (prefix != null) {
+ return new SimpleSegmentNameGenerator(prefix, postfix);
+ }
+ return new SimpleSegmentNameGenerator(tableName, postfix);
+ case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ DateTimeFormatSpec dateTimeFormatSpec = null;
+ String timeColumnName = validationConfig.getTimeColumnName();
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkNotNull(dateTimeFieldSpec,
+ "Schema does not contain the time column specified in the table
config.");
+ dateTimeFormatSpec = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ }
+ return new NormalizedDateSegmentNameGenerator(tableName, prefix,
excludeSequenceId,
+ IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
+
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig),
dateTimeFormatSpec);
+ default:
+ throw new UnsupportedOperationException("Unsupported segment name
generator type: " + segmentNameGeneratorType);
+ }
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index c0f941e..992f085 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -30,8 +30,6 @@ public class IndexingConfig extends BaseJsonConfig {
private List<String> _rangeIndexColumns;
private List<String> _jsonIndexColumns;
private List<String> _h3IndexColumns;
- private boolean _autoGeneratedInvertedIndex;
- private boolean _createInvertedIndexDuringSegmentGeneration;
private List<String> _sortedColumn;
private List<String> _bloomFilterColumns;
private Map<String, BloomFilterConfig> _bloomFilterConfigs;
@@ -51,6 +49,11 @@ public class IndexingConfig extends BaseJsonConfig {
private boolean _aggregateMetrics;
private boolean _nullHandlingEnabled;
+ // TODO: Add a new configuration related to the segment generation
+ private boolean _autoGeneratedInvertedIndex;
+ private boolean _createInvertedIndexDuringSegmentGeneration;
+ private String _segmentNameGeneratorType;
+
/**
* The list of columns for which the variable length dictionary needs to be
enabled in offline
* segments. This is only valid for string and bytes columns and has no
impact for columns of
@@ -250,4 +253,12 @@ public class IndexingConfig extends BaseJsonConfig {
public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
_nullHandlingEnabled = nullHandlingEnabled;
}
+
+ public String getSegmentNameGeneratorType() {
+ return _segmentNameGeneratorType;
+ }
+
+ public void setSegmentNameGeneratorType(String segmentNameGeneratorType) {
+ _segmentNameGeneratorType = segmentNameGeneratorType;
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
index 6c7058b..ff1905a 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java
@@ -52,6 +52,7 @@ public class IndexingConfigTest {
indexingConfig.setNoDictionaryConfig(noDictionaryConfig);
List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z");
indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
+ indexingConfig.setSegmentNameGeneratorType("normalizedDate");
indexingConfig =
JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig),
IndexingConfig.class);
@@ -63,6 +64,7 @@ public class IndexingConfigTest {
assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
assertEquals(indexingConfig.getVarLengthDictionaryColumns(),
varLengthDictionaryColumns);
+ assertEquals(indexingConfig.getSegmentNameGeneratorType(),
"normalizedDate");
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]