This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch all_custom_metadata_props_in_segment_creation in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit af3ff4076ec166d1e9f3b036079c23b800e94899 Author: Xiang Fu <[email protected]> AuthorDate: Sun Nov 29 17:55:50 2020 -0800 Adding custom metadata props into both segment metadata properties file and zk metadata record --- .../helix/core/util/ZKMetadataUtils.java | 1 + .../core/segment/creator/impl/V1Constants.java | 2 ++ .../segment/index/metadata/SegmentMetadata.java | 3 +++ .../index/metadata/SegmentMetadataImpl.java | 28 ++++++++++++++++++++++ .../batch/common/SegmentGenerationTaskRunner.java | 1 + .../batch/hadoop/HadoopSegmentCreationMapper.java | 2 ++ .../spark/SparkSegmentGenerationJobRunner.java | 2 ++ .../standalone/SegmentGenerationJobRunner.java | 2 ++ .../spi/ingestion/batch/BatchConfigProperties.java | 1 + .../batch/spec/SegmentGenerationTaskSpec.java | 26 ++++++++++++++++++++ 10 files changed, 68 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java index 9cb4455..b4103a7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java @@ -48,6 +48,7 @@ public class ZKMetadataUtils { offlineSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs()); offlineSegmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); offlineSegmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); + offlineSegmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); // Extract column partition metadata (if any), and set it into segment ZK metadata. Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index 0498f8c..0a09394 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -62,6 +62,8 @@ public class V1Constants { public static final String DATETIME_COLUMNS = "segment.datetime.column.names"; public static final String SEGMENT_TOTAL_DOCS = "segment.total.docs"; public static final String SEGMENT_PADDING_CHARACTER = "segment.padding.character"; + + public static final String CUSTOM_SUBSET = "custom"; } public static class Column { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java index d01406c..d409a65 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.segment.index.metadata; import java.io.File; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.data.Schema; @@ -107,4 +108,6 @@ public interface SegmentMetadata { char getPaddingCharacter(); boolean close(); + + Map<String, String> getCustomMap(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java index 1245034..15ed544 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java @@ -44,6 +44,7 @@ import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringEscapeUtils; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; @@ -104,6 +106,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { private int _totalDocs; private long _segmentStartTime; private long _segmentEndTime; + private Map<String, String> _customMap; /** * For segments on disk. @@ -117,6 +120,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { _columnMetadataMap = new HashMap<>(); _allColumns = new HashSet<>(); _schema = new Schema(); + _customMap = new HashMap<>(); init(segmentMetadataPropertiesConfiguration); File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir); @@ -160,6 +164,7 @@ public class SegmentMetadataImpl implements SegmentMetadata { _allColumns = schema.getColumnNames(); _schema = schema; _totalDocs = segmentMetadataPropertiesConfiguration.getInt(SEGMENT_TOTAL_DOCS); + _customMap = new HashMap<>(); } public static PropertiesConfiguration getPropertiesConfiguration(File indexDir) { @@ -263,6 +268,18 @@ public class SegmentMetadataImpl implements SegmentMetadata { segmentMetadataPropertiesConfiguration.subset(StarTreeV2Constants.MetadataKey.getStarTreePrefix(i)))); } } + + // Set custom configs from metadata properties + setCustomConfigs(segmentMetadataPropertiesConfiguration, _customMap); + } + + private static void setCustomConfigs(Configuration segmentMetadataPropertiesConfiguration, Map<String, String> customConfigsMap) { + Configuration customConfigs = segmentMetadataPropertiesConfiguration.subset(V1Constants.MetadataKeys.Segment.CUSTOM_SUBSET); + Iterator<String> customKeysIter = customConfigs.getKeys(); + while (customKeysIter.hasNext()) { + String key = customKeysIter.next(); + customConfigsMap.put(key, customConfigs.getString(key)); + } } /** @@ -427,6 +444,11 @@ public class SegmentMetadataImpl implements SegmentMetadata { return false; } + @Override + public Map<String, String> getCustomMap() { + return _customMap; + } + public List<StarTreeV2Metadata> getStarTreeV2MetadataList() { return _starTreeV2MetadataList; } @@ -531,6 +553,12 @@ public class SegmentMetadataImpl implements SegmentMetadata { segmentMetadata.put("creatorName", _creatorName); segmentMetadata.put("paddingCharacter", String.valueOf(_paddingCharacter)); + ObjectNode customConfigs = JsonUtils.newObjectNode(); + for (String key : _customMap.keySet()) { + customConfigs.put(key, _customMap.get(key)); + } + segmentMetadata.set("custom", customConfigs); + ArrayNode columnsMetadata = JsonUtils.newArrayNode(); for (String column : _allColumns) { if (columnFilter != null && !columnFilter.contains(column)) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java index 9c80942..2bd0762 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java @@ -95,6 +95,7 @@ public class SegmentGenerationTaskRunner implements Serializable { segmentGeneratorConfig.setReaderConfig(recordReaderConfig); segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName()); segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath()); + segmentGeneratorConfig.setCustomProperties(_taskSpec.getCustomProperties()); //build segment SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new SegmentIndexCreationDriverImpl(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java index c62c7dc..5b5d6f5 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java @@ -33,6 +33,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunne import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec; @@ -139,6 +140,7 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long .setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); // Start a thread that reports progress every minute during segment generation to prevent job getting killed Thread progressReporterThread = new Thread(getProgressReporter(context)); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index 9b36a91..5be70f2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -47,6 +47,7 @@ import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; @@ -303,6 +304,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode()); taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); String segmentName = taskRunner.run(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index f60a7e3..fc2f9be 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; @@ -186,6 +187,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { taskSpec.setTableConfig(tableConfig.toJsonNode()); taskSpec.setSequenceId(i); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); + taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI); _executorService.submit(() -> { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java index 2ebda7b..5d276aa 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java @@ -39,6 +39,7 @@ public class BatchConfigProperties { public static final String RECORD_READER_CONFIG_CLASS = "recordReader.config.className"; public static final String RECORD_READER_PROP_PREFIX = "recordReader.prop"; + public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri"; /** * Helper method to create a batch config property */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java index 5455bfa..d19d306 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java @@ -20,6 +20,8 @@ package org.apache.pinot.spi.ingestion.batch.spec; import com.fasterxml.jackson.databind.JsonNode; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import org.apache.pinot.spi.data.Schema; @@ -28,6 +30,8 @@ import org.apache.pinot.spi.data.Schema; * Note that this task creates a segment directory, not tar file. */ public class SegmentGenerationTaskSpec implements Serializable { + public static final String CUSTOM_SUBSET = "custom"; + public static final String CUSTOM_PREFIX = CUSTOM_SUBSET + '.'; /** * Table config to create segment @@ -64,6 +68,11 @@ public class SegmentGenerationTaskSpec implements Serializable { */ private int _sequenceId; + /** + * Custom properties set into segment metadata + */ + private Map<String, String> _customProperties = new HashMap<>(); + public JsonNode getTableConfig() { return _tableConfig; } @@ -119,4 +128,21 @@ public class SegmentGenerationTaskSpec implements Serializable { public void setSequenceId(int sequenceId) { _sequenceId = sequenceId; } + + public void setCustomProperty(String key, String value) { + if (!key.startsWith(CUSTOM_PREFIX)) { + key = CUSTOM_PREFIX + key; + } + _customProperties.put(key, value); + } + + public void setCustomProperties(Map<String, String> customProperties) { + for (String key : customProperties.keySet()) { + setCustomProperty(key, customProperties.get(key)); + } + } + + public Map<String, String> getCustomProperties() { + return _customProperties; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
