This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch support-spark-preprocessing in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6231d1a22d7b8e665c73aac90f02d3e9ab8099c8 Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Thu Jul 22 16:05:03 2021 -0700 Support data preprocessing in Spark framework --- .../hadoop/job/HadoopSegmentPreprocessingJob.java | 166 +-------------- .../hadoop/job/mappers/SegmentCreationMapper.java | 2 +- .../job/preprocess/DataPreprocessingHelper.java | 228 --------------------- .../HadoopAvroDataPreprocessingHelper.java | 49 +++++ .../preprocess/HadoopDataPreprocessingHelper.java | 149 ++++++++++++++ .../HadoopDataPreprocessingHelperFactory.java | 39 ++++ .../hadoop/job/preprocess/HadoopJobPreparer.java | 10 + .../HadoopOrcDataPreprocessingHelper.java | 48 +++++ .../v0_deprecated/pinot-ingestion-common/pom.xml | 8 + .../ingestion/jobs/SegmentPreprocessingJob.java | 172 +++++++++++++++- .../preprocess/AvroDataPreprocessingHelper.java | 39 ++-- .../preprocess/DataPreprocessingHelper.java | 118 +++++++++++ .../preprocess/DataPreprocessingHelperFactory.java | 4 +- .../preprocess/OrcDataPreprocessingHelper.java | 39 ++-- .../preprocess/SampleTimeColumnExtractable.java | 9 + .../mappers/AvroDataPreprocessingMapper.java | 6 +- .../mappers/OrcDataPreprocessingMapper.java | 8 +- .../mappers/SegmentPreprocessingMapper.java | 4 +- .../AvroDataPreprocessingPartitioner.java | 4 +- .../partitioners/GenericPartitioner.java | 8 +- .../OrcDataPreprocessingPartitioner.java | 6 +- .../partitioners/PartitionFunctionFactory.java | 2 +- .../reducers/AvroDataPreprocessingReducer.java | 4 +- .../reducers/OrcDataPreprocessingReducer.java | 4 +- .../ingestion/utils}/DataPreprocessingUtils.java | 2 +- .../ingestion/utils}/InternalConfigConstants.java | 2 +- .../ingestion}/utils/preprocess/DataFileUtils.java | 2 +- .../ingestion}/utils/preprocess/HadoopUtils.java | 2 +- .../ingestion}/utils/preprocess/OrcUtils.java | 2 +- .../utils/preprocess/TextComparator.java | 2 +- .../spark/jobs/SparkSegmentPreprocessingJob.java | 89 ++++++++ .../SparkAvroDataPreprocessingHelper.java | 39 ++++ .../preprocess/SparkDataPreprocessingHelper.java | 27 +++ .../SparkDataPreprocessingHelperFactory.java | 39 ++++ .../SparkOrcDataPreprocessingHelper.java | 16 ++ .../org/apache/pinot/spark/utils}/HadoopUtils.java | 2 +- 36 files changed, 871 insertions(+), 479 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java index 81a9902..9707487 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java @@ -18,33 +18,19 @@ */ package org.apache.pinot.hadoop.job; -import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper; -import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory; +import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper; import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; -import org.apache.pinot.spi.config.table.ColumnPartitionConfig; -import org.apache.pinot.spi.config.table.FieldConfig; -import org.apache.pinot.spi.config.table.IndexingConfig; -import org.apache.pinot.spi.config.table.SegmentPartitionConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableCustomConfig; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,21 +45,6 @@ import org.slf4j.LoggerFactory; public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class); - private String _partitionColumn; - private int _numPartitions; - private String _partitionFunction; - - private String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private org.apache.pinot.spi.data.Schema _pinotTableSchema; - - private Set<DataPreprocessingUtils.Operation> _preprocessingOperations; - public HadoopSegmentPreprocessingJob(final Properties properties) { super(properties); } @@ -96,8 +67,8 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { // Cleans up preprocessed output dir if exists cleanUpPreprocessedOutputs(_preprocessedOutputDir); - DataPreprocessingHelper dataPreprocessingHelper = - DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + HadoopDataPreprocessingHelper dataPreprocessingHelper = + HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); dataPreprocessingHelper .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile); @@ -126,124 +97,6 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { LOGGER.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); } - private void fetchPreProcessingOperations() { - _preprocessingOperations = new HashSet<>(); - TableCustomConfig customConfig = _tableConfig.getCustomConfig(); - if (customConfig != null) { - Map<String, String> customConfigMap = customConfig.getCustomConfigs(); - if (customConfigMap != null && !customConfigMap.isEmpty()) { - String preprocessingOperationsString = - customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); - DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); - } - } - } - - private void fetchPartitioningConfig() { - // Fetch partition info from table config. - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { - LOGGER.info("Partitioning is disabled."); - return; - } - SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); - if (segmentPartitionConfig != null) { - Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); - Preconditions - .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); - if (columnPartitionMap.size() == 1) { - _partitionColumn = columnPartitionMap.keySet().iterator().next(); - _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); - _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); - } - } else { - LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); - } - } - - private void fetchSortingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { - LOGGER.info("Sorting is disabled."); - return; - } - // Fetch sorting info from table config first. - List<String> sortingColumns = new ArrayList<>(); - List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); - if (fieldConfigs != null && !fieldConfigs.isEmpty()) { - for (FieldConfig fieldConfig : fieldConfigs) { - if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { - sortingColumns.add(fieldConfig.getName()); - } - } - } - if (!sortingColumns.isEmpty()) { - Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); - _sortingColumn = sortingColumns.get(0); - return; - } - - // There is no sorted column specified in field configs, try to find sorted column from indexing config. - IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); - List<String> sortedColumns = indexingConfig.getSortedColumn(); - if (sortedColumns != null) { - Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); - if (sortedColumns.size() == 1) { - _sortingColumn = sortedColumns.get(0); - FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); - Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); - Preconditions - .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); - _sortingColumnType = fieldSpec.getDataType(); - Preconditions - .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, - _sortingColumn); - LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); - } - } - } - - private void fetchResizingConfig() { - if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { - LOGGER.info("Resizing is disabled."); - return; - } - TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); - if (tableCustomConfig == null) { - _numOutputFiles = 0; - return; - } - Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); - if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { - _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); - Preconditions.checkState(_numOutputFiles > 0, String - .format("The value of %s should be positive! Current value: %s", - InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); - } else { - _numOutputFiles = 0; - } - - if (customConfigsMap != null) { - int maxNumRecords; - if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { - LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", - InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, - InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); - maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); - } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { - maxNumRecords = - Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); - } else { - return; - } - // TODO: add a in-built maximum value for this config to avoid having too many small files. - // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files. - Preconditions.checkArgument(maxNumRecords > 0, - "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE - + " should be positive. Current value: " + maxNumRecords); - LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); - _maxNumRecordsPerFile = maxNumRecords; - } - } - @Override protected Schema getSchema() throws IOException { @@ -265,15 +118,6 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob { protected void addAdditionalJobProperties(Job job) { } - private void setTableConfigAndSchema() - throws IOException { - _tableConfig = getTableConfig(); - _pinotTableSchema = getSchema(); - - Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); - Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); - } - /** * Cleans up outputs in preprocessed output directory. */ diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 6927d5d..14955c9 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -35,7 +35,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.pinot.common.utils.TarGzCompressionUtils; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.ingestion.jobs.SegmentCreationJob; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java deleted file mode 100644 index a505d09..0000000 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.hadoop.job.preprocess; - -import java.io.IOException; -import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; -import org.apache.pinot.hadoop.utils.preprocess.TextComparator; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.data.DateTimeFieldSpec; -import org.apache.pinot.spi.data.DateTimeFormatSpec; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.utils.IngestionConfigUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public abstract class DataPreprocessingHelper { - private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); - - String _partitionColumn; - int _numPartitions; - String _partitionFunction; - - String _sortingColumn; - private FieldSpec.DataType _sortingColumnType; - - private int _numOutputFiles; - private int _maxNumRecordsPerFile; - - private TableConfig _tableConfig; - private Schema _pinotTableSchema; - - List<Path> _inputDataPaths; - Path _sampleRawDataPath; - Path _outputPath; - - public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { - _inputDataPaths = inputDataPaths; - _sampleRawDataPath = inputDataPaths.get(0); - _outputPath = outputPath; - } - - public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, - String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, - int maxNumRecordsPerFile) { - _tableConfig = tableConfig; - _pinotTableSchema = tableSchema; - _partitionColumn = partitionColumn; - _numPartitions = numPartitions; - _partitionFunction = partitionFunction; - - _sortingColumn = sortingColumn; - _sortingColumnType = sortingColumnType; - - _numOutputFiles = numOutputFiles; - _maxNumRecordsPerFile = maxNumRecordsPerFile; - } - - public Job setUpJob() - throws IOException { - LOGGER.info("Initializing a pre-processing job"); - Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); - Configuration jobConf = job.getConfiguration(); - // Input and output paths. - int numInputPaths = _inputDataPaths.size(); - jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); - setValidationConfigs(job, _sampleRawDataPath); - for (Path inputFile : _inputDataPaths) { - FileInputFormat.addInputPath(job, inputFile); - } - setHadoopJobConfigs(job); - - // Sorting column - if (_sortingColumn != null) { - LOGGER.info("Adding sorting column: {} to job config", _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _sortingColumn); - jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _sortingColumnType.name()); - - switch (_sortingColumnType) { - case INT: - job.setMapOutputKeyClass(IntWritable.class); - break; - case LONG: - job.setMapOutputKeyClass(LongWritable.class); - break; - case FLOAT: - job.setMapOutputKeyClass(FloatWritable.class); - break; - case DOUBLE: - job.setMapOutputKeyClass(DoubleWritable.class); - break; - case STRING: - job.setMapOutputKeyClass(Text.class); - job.setSortComparatorClass(TextComparator.class); - break; - default: - throw new IllegalStateException(); - } - } else { - job.setMapOutputKeyClass(NullWritable.class); - } - - // Partition column - int numReduceTasks = 0; - if (_partitionColumn != null) { - numReduceTasks = _numPartitions; - jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); - job.setPartitionerClass(GenericPartitioner.class); - jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _partitionColumn); - if (_partitionFunction != null) { - jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _partitionFunction); - } - jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); - job.setPartitionerClass(getPartitioner()); - } else { - if (_numOutputFiles > 0) { - numReduceTasks = _numOutputFiles; - } else { - // default number of input paths - numReduceTasks = _inputDataPaths.size(); - } - } - // Maximum number of records per output file - jobConf - .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, Integer.toString(_maxNumRecordsPerFile)); - // Number of reducers - LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); - job.setNumReduceTasks(numReduceTasks); - - setUpMapperReducerConfigs(job); - - return job; - } - - abstract Class<? extends Partitioner> getPartitioner(); - - abstract void setUpMapperReducerConfigs(Job job) - throws IOException; - - abstract String getSampleTimeColumnValue(String timeColumnName) - throws IOException; - - private void setValidationConfigs(Job job, Path path) - throws IOException { - SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); - - // TODO: Serialize and deserialize validation config by creating toJson and fromJson - // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, - // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name - // and value - if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { - job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); - String timeColumnName = validationConfig.getTimeColumnName(); - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); - if (timeColumnName != null) { - DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); - if (dateTimeFieldSpec != null) { - DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); - job.getConfiguration() - .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); - job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); - } - } - job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, - IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); - - String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); - if (sampleTimeColumnValue != null) { - job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); - } - } - } - - private void setHadoopJobConfigs(Job job) { - job.setJarByClass(HadoopSegmentPreprocessingJob.class); - job.setJobName(getClass().getName()); - FileOutputFormat.setOutputPath(job, _outputPath); - job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); - // Turn this on to always firstly use class paths that user specifies. - job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); - // Turn this off since we don't need an empty file in the output directory - job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); - - String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (hadoopTokenFileLocation != null) { - job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); - } - } -} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java new file mode 100644 index 0000000..89c836c --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java @@ -0,0 +1,49 @@ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.AvroMultipleOutputs; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopAvroDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class); + + public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Schema avroSchema = (Schema) getSchema(_dataPreprocessingHelper._sampleRawDataPath); + LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); + validateConfigsAgainstSchema(avroSchema); + + job.setInputFormatClass(AvroKeyInputFormat.class); + job.setMapperClass(AvroDataPreprocessingMapper.class); + + job.setReducerClass(AvroDataPreprocessingReducer.class); + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroJob.setInputKeySchema(job, avroSchema); + AvroJob.setMapOutputValueSchema(job, avroSchema); + AvroJob.setOutputKeySchema(job, avroSchema); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java new file mode 100644 index 0000000..d71575f --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java @@ -0,0 +1,149 @@ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob; +import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.utils.preprocess.TextComparator; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class HadoopDataPreprocessingHelper implements HadoopJobPreparer { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class); + + protected DataPreprocessingHelper _dataPreprocessingHelper; + + public HadoopDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; + } + + public Job setUpJob() + throws IOException { + LOGGER.info("Initializing a pre-processing job"); + Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION); + Configuration jobConf = job.getConfiguration(); + // Input and output paths. + int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size(); + jobConf.setInt(JobContext.NUM_MAPS, numInputPaths); + _dataPreprocessingHelper.setValidationConfigs(job, _dataPreprocessingHelper._sampleRawDataPath); + for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) { + FileInputFormat.addInputPath(job, inputFile); + } + setHadoopJobConfigs(job); + + // Sorting column + if (_dataPreprocessingHelper._sortingColumn != null) { + LOGGER.info("Adding sorting column: {} to job config", _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, _dataPreprocessingHelper._sortingColumn); + jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, _dataPreprocessingHelper._sortingColumnType.name()); + + switch (_dataPreprocessingHelper._sortingColumnType) { + case INT: + job.setMapOutputKeyClass(IntWritable.class); + break; + case LONG: + job.setMapOutputKeyClass(LongWritable.class); + break; + case FLOAT: + job.setMapOutputKeyClass(FloatWritable.class); + break; + case DOUBLE: + job.setMapOutputKeyClass(DoubleWritable.class); + break; + case STRING: + job.setMapOutputKeyClass(Text.class); + job.setSortComparatorClass(TextComparator.class); + break; + default: + throw new IllegalStateException(); + } + } else { + job.setMapOutputKeyClass(NullWritable.class); + } + + // Partition column + int numReduceTasks = 0; + if (_dataPreprocessingHelper._partitionColumn != null) { + numReduceTasks = _dataPreprocessingHelper._numPartitions; + jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true"); + job.setPartitionerClass(GenericPartitioner.class); + jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, _dataPreprocessingHelper._partitionColumn); + if (_dataPreprocessingHelper._partitionFunction != null) { + jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, _dataPreprocessingHelper._partitionFunction); + } + jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, numReduceTasks); + job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner()); + } else { + if (_dataPreprocessingHelper._numOutputFiles > 0) { + numReduceTasks = _dataPreprocessingHelper._numOutputFiles; + } else { + // default number of input paths + numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size(); + } + } + // Maximum number of records per output file + jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, + Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile)); + // Number of reducers + LOGGER.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); + job.setNumReduceTasks(numReduceTasks); + + setUpMapperReducerConfigs(job); + + return job; + } + + private void setHadoopJobConfigs(Job job) { + job.setJarByClass(HadoopSegmentPreprocessingJob.class); + job.setJobName(getClass().getName()); + FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath); + job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); + // Turn this on to always firstly use class paths that user specifies. + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); + // Turn this off since we don't need an empty file in the output directory + job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); + + String hadoopTokenFileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); + if (hadoopTokenFileLocation != null) { + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); + } + } + + public Object getSchema(Path inputPathDir) + throws IOException { + return _dataPreprocessingHelper.getSchema(inputPathDir); + } + + public void validateConfigsAgainstSchema(Object schema) { + _dataPreprocessingHelper.validateConfigsAgainstSchema(schema); + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, sortingColumn, + sortingColumnType, numOutputFiles, maxNumRecordsPerFile); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java new file mode 100644 index 0000000..591fd46 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java @@ -0,0 +1,39 @@ +package org.apache.pinot.hadoop.job.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopDataPreprocessingHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class); + + public static HadoopDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + throws IOException { + final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); + final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + inputPaths); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + inputPaths); + + if (numAvroFiles > 0) { + LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); + return new HadoopAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); + } else { + LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); + return new HadoopOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java new file mode 100644 index 0000000..ea2faf6 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java @@ -0,0 +1,10 @@ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.Job; + + +public interface HadoopJobPreparer { + + void setUpMapperReducerConfigs(Job job) throws IOException; +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java new file mode 100644 index 0000000..11ade11 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java @@ -0,0 +1,48 @@ +package org.apache.pinot.hadoop.job.preprocess; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.orc.OrcConf; +import org.apache.orc.mapred.OrcStruct; +import org.apache.orc.mapred.OrcValue; +import org.apache.orc.mapreduce.OrcInputFormat; +import org.apache.orc.mapreduce.OrcOutputFormat; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopOrcDataPreprocessingHelper extends HadoopDataPreprocessingHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class); + + public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpMapperReducerConfigs(Job job) + throws IOException { + Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath); + String orcSchemaString = orcSchema.toString(); + LOGGER.info("Orc schema is: {}", orcSchemaString); + validateConfigsAgainstSchema(orcSchema); + + job.setInputFormatClass(OrcInputFormat.class); + job.setMapperClass(OrcDataPreprocessingMapper.class); + job.setMapOutputValueClass(OrcValue.class); + Configuration jobConf = job.getConfiguration(); + OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + + job.setReducerClass(OrcDataPreprocessingReducer.class); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(OrcStruct.class); + OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml index 284b698..3df7bb8 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml @@ -117,5 +117,13 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-mapreduce</artifactId> + </dependency> </dependencies> </project> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java index 2e3b023..fe51e00 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java @@ -21,11 +21,25 @@ package org.apache.pinot.ingestion.jobs; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pinot.ingestion.common.ControllerRestApi; import org.apache.pinot.ingestion.common.JobConfigConstants; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableCustomConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +52,7 @@ import org.slf4j.LoggerFactory; * * enable.preprocessing: false by default. Enables preprocessing job. */ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { - private static final Logger _logger = LoggerFactory.getLogger(SegmentPreprocessingJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingJob.class); protected final Path _schemaFile; protected final Path _inputSegmentDir; protected final Path _preprocessedOutputDir; @@ -46,6 +60,21 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { protected final Path _pathToDependencyJar; protected boolean _enablePreprocessing; + protected String _partitionColumn; + protected int _numPartitions; + protected String _partitionFunction; + + protected String _sortingColumn; + protected FieldSpec.DataType _sortingColumnType; + + protected int _numOutputFiles; + protected int _maxNumRecordsPerFile; + + protected TableConfig _tableConfig; + protected org.apache.pinot.spi.data.Schema _pinotTableSchema; + + protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations; + public SegmentPreprocessingJob(final Properties properties) { super(properties); @@ -59,13 +88,13 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { _pathToDependencyJar = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR); _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA); - _logger.info("*********************************************************************"); - _logger.info("enable.preprocessing: {}", _enablePreprocessing); - _logger.info("path.to.input: {}", _inputSegmentDir); - _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir); - _logger.info("path.to.deps.jar: {}", _pathToDependencyJar); - _logger.info("push.locations: {}", _pushLocations); - _logger.info("*********************************************************************"); + LOGGER.info("*********************************************************************"); + LOGGER.info("enable.preprocessing: {}", _enablePreprocessing); + LOGGER.info("path.to.input: {}", _inputSegmentDir); + LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir); + LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar); + LOGGER.info("push.locations: {}", _pushLocations); + LOGGER.info("*********************************************************************"); } protected abstract void run() @@ -90,4 +119,131 @@ public abstract class SegmentPreprocessingJob extends BaseSegmentJob { // TODO: support orc format in the future. return fileName.endsWith(".avro"); } + + protected void setTableConfigAndSchema() + throws IOException { + _tableConfig = getTableConfig(); + _pinotTableSchema = getSchema(); + + Preconditions.checkState(_tableConfig != null, "Table config cannot be null."); + Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be null"); + } + + protected void fetchPreProcessingOperations() { + _preprocessingOperations = new HashSet<>(); + TableCustomConfig customConfig = _tableConfig.getCustomConfig(); + if (customConfig != null) { + Map<String, String> customConfigMap = customConfig.getCustomConfigs(); + if (customConfigMap != null && !customConfigMap.isEmpty()) { + String preprocessingOperationsString = + customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, ""); + DataPreprocessingUtils.getOperations(_preprocessingOperations, preprocessingOperationsString); + } + } + } + + protected void fetchPartitioningConfig() { + // Fetch partition info from table config. + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION)) { + LOGGER.info("Partitioning is disabled."); + return; + } + SegmentPartitionConfig segmentPartitionConfig = _tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + if (segmentPartitionConfig != null) { + Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap(); + Preconditions + .checkArgument(columnPartitionMap.size() <= 1, "There should be at most 1 partition setting in the table."); + if (columnPartitionMap.size() == 1) { + _partitionColumn = columnPartitionMap.keySet().iterator().next(); + _numPartitions = segmentPartitionConfig.getNumPartitions(_partitionColumn); + _partitionFunction = segmentPartitionConfig.getFunctionName(_partitionColumn); + } + } else { + LOGGER.info("Segment partition config is null for table: {}", _tableConfig.getTableName()); + } + } + + protected void fetchSortingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) { + LOGGER.info("Sorting is disabled."); + return; + } + // Fetch sorting info from table config first. + List<String> sortingColumns = new ArrayList<>(); + List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList(); + if (fieldConfigs != null && !fieldConfigs.isEmpty()) { + for (FieldConfig fieldConfig : fieldConfigs) { + if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) { + sortingColumns.add(fieldConfig.getName()); + } + } + } + if (!sortingColumns.isEmpty()) { + Preconditions.checkArgument(sortingColumns.size() == 1, "There should be at most 1 sorted column in the table."); + _sortingColumn = sortingColumns.get(0); + return; + } + + // There is no sorted column specified in field configs, try to find sorted column from indexing config. + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + List<String> sortedColumns = indexingConfig.getSortedColumn(); + if (sortedColumns != null) { + Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be at most 1 sorted column in the table."); + if (sortedColumns.size() == 1) { + _sortingColumn = sortedColumns.get(0); + FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(_sortingColumn); + Preconditions.checkState(fieldSpec != null, "Failed to find sorting column: {} in the schema", _sortingColumn); + Preconditions + .checkState(fieldSpec.isSingleValueField(), "Cannot sort on multi-value column: %s", _sortingColumn); + _sortingColumnType = fieldSpec.getDataType(); + Preconditions + .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort on %s column: %s", _sortingColumnType, + _sortingColumn); + LOGGER.info("Sorting the data with column: {} of type: {}", _sortingColumn, _sortingColumnType); + } + } + } + + protected void fetchResizingConfig() { + if (!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) { + LOGGER.info("Resizing is disabled."); + return; + } + TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig(); + if (tableCustomConfig == null) { + _numOutputFiles = 0; + return; + } + Map<String, String> customConfigsMap = tableCustomConfig.getCustomConfigs(); + if (customConfigsMap != null && customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)) { + _numOutputFiles = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS)); + Preconditions.checkState(_numOutputFiles > 0, String + .format("The value of %s should be positive! Current value: %s", + InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, _numOutputFiles)); + } else { + _numOutputFiles = 0; + } + + if (customConfigsMap != null) { + int maxNumRecords; + if (customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)) { + LOGGER.warn("The config: {} from custom config is deprecated. Use {} instead.", + InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE, + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE); + maxNumRecords = Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE)); + } else if (customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)) { + maxNumRecords = + Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE)); + } else { + return; + } + // TODO: add a in-built maximum value for this config to avoid having too many small files. + // E.g. if the config is set to 1 which is smaller than this in-built value, the job should be abort from generating too many small files. + Preconditions.checkArgument(maxNumRecords > 0, + "The value of " + InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE + + " should be positive. Current value: " + maxNumRecords); + LOGGER.info("Setting {} to {}", InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords); + _maxNumRecordsPerFile = maxNumRecords; + } + } } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java similarity index 79% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java index 9e5f5f2..260c153 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; @@ -39,10 +39,10 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,30 +61,19 @@ public class AvroDataPreprocessingHelper extends DataPreprocessingHelper { } @Override - public void setUpMapperReducerConfigs(Job job) + public Object getSchema(Path inputPathDir) throws IOException { - Schema avroSchema = getAvroSchema(_sampleRawDataPath); - LOGGER.info("Avro schema is: {}", avroSchema.toString(true)); - validateConfigsAgainstSchema(avroSchema); - - job.setInputFormatClass(AvroKeyInputFormat.class); - job.setMapperClass(AvroDataPreprocessingMapper.class); - - job.setReducerClass(AvroDataPreprocessingReducer.class); - AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, avroSchema); - AvroMultipleOutputs.setCountersEnabled(job, true); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); - job.setOutputKeyClass(AvroKey.class); - job.setOutputValueClass(NullWritable.class); + return getAvroSchema(inputPathDir); + } - AvroJob.setInputKeySchema(job, avroSchema); - AvroJob.setMapOutputValueSchema(job, avroSchema); - AvroJob.setOutputKeySchema(job, avroSchema); + @Override + public void validateConfigsAgainstSchema(Object schema) { + Schema avroSchema = (Schema) schema; + validateConfigsAgainstSchema(avroSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { String sampleTimeColumnValue; try (DataFileStream<GenericRecord> dataStreamReader = getAvroReader(_sampleRawDataPath)) { @@ -99,7 +88,7 @@ public class AvroDataPreprocessingHelper extends DataPreprocessingHelper { * @return Input schema * @throws IOException exception when accessing to IO */ - private Schema getAvroSchema(Path inputPathDir) + protected Schema getAvroSchema(Path inputPathDir) throws IOException { Schema avroSchema = null; for (FileStatus fileStatus : HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java new file mode 100644 index 0000000..cb0a738 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.ingestion.preprocess; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.DateTimeFieldSpec; +import org.apache.pinot.spi.data.DateTimeFormatSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class DataPreprocessingHelper implements SampleTimeColumnExtractable { + private static final Logger LOGGER = LoggerFactory.getLogger(DataPreprocessingHelper.class); + + public String _partitionColumn; + public int _numPartitions; + public String _partitionFunction; + + public String _sortingColumn; + public FieldSpec.DataType _sortingColumnType; + + public int _numOutputFiles; + public int _maxNumRecordsPerFile; + + public TableConfig _tableConfig; + public Schema _pinotTableSchema; + + public List<Path> _inputDataPaths; + public Path _sampleRawDataPath; + public Path _outputPath; + + public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) { + _inputDataPaths = inputDataPaths; + _sampleRawDataPath = inputDataPaths.get(0); + _outputPath = outputPath; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _tableConfig = tableConfig; + _pinotTableSchema = tableSchema; + _partitionColumn = partitionColumn; + _numPartitions = numPartitions; + _partitionFunction = partitionFunction; + + _sortingColumn = sortingColumn; + _sortingColumnType = sortingColumnType; + + _numOutputFiles = numOutputFiles; + _maxNumRecordsPerFile = maxNumRecordsPerFile; + } + + public abstract Class<? extends Partitioner> getPartitioner(); + + abstract public Object getSchema(Path inputPathDir) + throws IOException; + + abstract public void validateConfigsAgainstSchema(Object schema); + + public void setValidationConfigs(Job job, Path path) + throws IOException { + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + + // TODO: Serialize and deserialize validation config by creating toJson and fromJson + // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, + // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name + // and value + if (IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND")) { + job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true"); + String timeColumnName = validationConfig.getTimeColumnName(); + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName); + if (timeColumnName != null) { + DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName); + if (dateTimeFieldSpec != null) { + DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString()); + job.getConfiguration() + .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString()); + job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern()); + } + } + job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY, + IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig)); + + String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName); + if (sampleTimeColumnValue != null) { + job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, sampleTimeColumnValue); + } + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java index 2e91773..1cb07e3 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; -import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java similarity index 86% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java index aec0bb0..6168426 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.preprocess; +package org.apache.pinot.ingestion.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; @@ -43,10 +43,10 @@ import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; import org.apache.orc.mapreduce.OrcInputFormat; import org.apache.orc.mapreduce.OrcOutputFormat; -import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper; -import org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner; -import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer; -import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils; +import org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper; +import org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner; +import org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer; +import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.utils.StringUtils; import org.slf4j.Logger; @@ -61,33 +61,24 @@ public class OrcDataPreprocessingHelper extends DataPreprocessingHelper { } @Override - Class<? extends Partitioner> getPartitioner() { + public Class<? extends Partitioner> getPartitioner() { return OrcDataPreprocessingPartitioner.class; } @Override - void setUpMapperReducerConfigs(Job job) { - TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath); - String orcSchemaString = orcSchema.toString(); - LOGGER.info("Orc schema is: {}", orcSchemaString); - validateConfigsAgainstSchema(orcSchema); - - job.setInputFormatClass(OrcInputFormat.class); - job.setMapperClass(OrcDataPreprocessingMapper.class); - job.setMapOutputValueClass(OrcValue.class); - Configuration jobConf = job.getConfiguration(); - OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString); + public Object getSchema(Path inputPathDir) + throws IOException { + return getOrcSchema(inputPathDir); + } - job.setReducerClass(OrcDataPreprocessingReducer.class); - // Use LazyOutputFormat to avoid creating empty files. - LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(OrcStruct.class); - OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString); + @Override + public void validateConfigsAgainstSchema(Object schema) { + TypeDescription orcSchema = (TypeDescription) schema; + validateConfigsAgainstSchema(orcSchema); } @Override - String getSampleTimeColumnValue(String timeColumnName) + public String getSampleTimeColumnValue(String timeColumnName) throws IOException { try (Reader reader = OrcFile .createReader(_sampleRawDataPath, OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java new file mode 100644 index 0000000..5245330 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java @@ -0,0 +1,9 @@ +package org.apache.pinot.ingestion.preprocess; + +import java.io.IOException; + + +public interface SampleTimeColumnExtractable { + + String getSampleTimeColumnValue(String timeColumnName) throws IOException; +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java index 6278e8e..31229f0 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java similarity index 93% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java index d7d0694..19beaa6 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.DataPreprocessingUtils; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java index 3d3fcec..4d8e2f2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.mappers; +package org.apache.pinot.ingestion.preprocess.mappers; import com.google.common.base.Preconditions; import java.io.IOException; @@ -30,7 +30,7 @@ import org.apache.avro.mapreduce.AvroJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.spi.data.DateTimeFieldSpec; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java index 74799c7..d833964 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import org.apache.avro.generic.GenericRecord; @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java similarity index 88% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java index 7ca22cf..f9c1467 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroValue; @@ -27,9 +27,9 @@ import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.NUM_PARTITIONS_CONFIG; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_COLUMN_CONFIG; -import static org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_FUNCTION_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.NUM_PARTITIONS_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_COLUMN_CONFIG; +import static org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_FUNCTION_CONFIG; public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable { diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java index bef2cef..d9b509d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import com.google.common.base.Preconditions; import java.util.List; @@ -26,8 +26,8 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; -import org.apache.pinot.hadoop.utils.preprocess.OrcUtils; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.preprocess.OrcUtils; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java index 0ba57db..1826bdc 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.partitioners; +package org.apache.pinot.ingestion.preprocess.partitioners; import java.util.HashMap; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java index 5fcbf10..4f236a4 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.avro.generic.GenericRecord; @@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java index a3387a2..10d3f10 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job.reducers; +package org.apache.pinot.ingestion.preprocess.reducers; import java.io.IOException; import org.apache.commons.lang3.RandomStringUtils; @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; -import org.apache.pinot.hadoop.job.InternalConfigConstants; +import org.apache.pinot.ingestion.utils.InternalConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java index 4cc6f75..8450179 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils; import java.util.Set; import org.apache.hadoop.io.DoubleWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java index 3701db2..e85a3e8 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.job; +package org.apache.pinot.ingestion.utils; /** * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java index 58e1c1d..a6e4ca7 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import com.google.common.base.Preconditions; import java.io.IOException; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java similarity index 96% copy from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java copy to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java index 0596259..e4cdf5e 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import java.io.IOException; import org.apache.hadoop.conf.Configuration; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java index dcfc3b5..09eb218 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.io.BooleanWritable; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java index 65f1222..05a437d 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.ingestion.utils.preprocess; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java new file mode 100644 index 0000000..b4b72a3 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java @@ -0,0 +1,89 @@ +package org.apache.pinot.spark.jobs; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper; +import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory; +import org.apache.pinot.spark.utils.HadoopUtils; +import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Spark job which provides partitioning, sorting, and resizing against the input files, which is raw data in either Avro or Orc format. + * Thus, the output files are partitioned, sorted, resized after this job. + * In order to run this job, the following configs need to be specified in job properties: + * * enable.preprocessing: false by default. Enables preprocessing job. + */ +public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class); + + public SparkSegmentPreprocessingJob(Properties properties) { + super(properties); + } + + @Override + protected void run() + throws Exception { + if (!_enablePreprocessing) { + LOGGER.info("Pre-processing job is disabled."); + return; + } else { + LOGGER.info("Starting {}", getClass().getSimpleName()); + } + + setTableConfigAndSchema(); + fetchPreProcessingOperations(); + fetchPartitioningConfig(); + fetchSortingConfig(); + fetchResizingConfig(); + + // Cleans up preprocessed output dir if exists + cleanUpPreprocessedOutputs(_preprocessedOutputDir); + + SparkDataPreprocessingHelper dataPreprocessingHelper = + SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir, _preprocessedOutputDir); + dataPreprocessingHelper + .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, _numPartitions, _partitionFunction, + _sortingColumn, _sortingColumnType, _numOutputFiles, _maxNumRecordsPerFile); + + // Set up and execute spark job. + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()); + addDepsJarToDistributedCache(javaSparkContext); + + SparkSession sparkSession = SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate(); + + dataPreprocessingHelper.setUpAndExecuteJob(sparkSession); + } + + public void setUpAndExecuteJob(JavaSparkContext sparkContext) { + } + + /** + * Cleans up outputs in preprocessed output directory. + */ + public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir) + throws IOException { + if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) { + LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir); + HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true); + } + } + + protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext) + throws IOException { + if (_pathToDependencyJar != null) { + PinotSparkJobPreparationHelper + .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, sparkContext, + _pathToDependencyJar); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java new file mode 100644 index 0000000..3374f3c --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java @@ -0,0 +1,39 @@ +package org.apache.pinot.spark.jobs.preprocess; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.collection.JavaConverters; + + +public class SparkAvroDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpAndExecuteJob(SparkSession sparkSession) { + + JavaConverters.asJavaCollectionConverter(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths)) + sparkSession.read().format("avro").load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths)); + + } + + @Override + public void setUpAndExecuteJob(JavaSparkContext sparkContext) { + JavaRDD<String> inputRDD = sparkContext.textFile(_dataPreprocessingHelper._inputDataPaths.toString(), _dataPreprocessingHelper._numOutputFiles); + } + + private Iterable<String> convertPathsToStrings(List<Path> paths) { + List<String> stringList = new ArrayList<>(); + for (Path path : paths) { + stringList.add(path.toString()); + } + stringList.it + return stringList.iterator(); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java new file mode 100644 index 0000000..54520df --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java @@ -0,0 +1,27 @@ +package org.apache.pinot.spark.jobs.preprocess; + +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.spark.sql.SparkSession; + + +public abstract class SparkDataPreprocessingHelper { + + protected DataPreprocessingHelper _dataPreprocessingHelper; + + public SparkDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + _dataPreprocessingHelper = dataPreprocessingHelper; + } + + public void registerConfigs(TableConfig tableConfig, Schema tableSchema, String partitionColumn, int numPartitions, + String partitionFunction, String sortingColumn, FieldSpec.DataType sortingColumnType, int numOutputFiles, + int maxNumRecordsPerFile) { + _dataPreprocessingHelper + .registerConfigs(tableConfig, tableSchema, partitionColumn, numPartitions, partitionFunction, sortingColumn, + sortingColumnType, numOutputFiles, maxNumRecordsPerFile); + } + + public abstract void setUpAndExecuteJob(SparkSession sparkSession); +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java new file mode 100644 index 0000000..8f6a92e --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java @@ -0,0 +1,39 @@ +package org.apache.pinot.spark.jobs.preprocess; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper; +import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper; +import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SparkDataPreprocessingHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class); + + public static SparkDataPreprocessingHelper generateDataPreprocessingHelper(Path inputPaths, Path outputPath) + throws IOException { + final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.AVRO_FILE_EXTENSION); + final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, DataFileUtils.ORC_FILE_EXTENSION); + + int numAvroFiles = avroFiles.size(); + int numOrcFiles = orcFiles.size(); + Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0, + "Cannot preprocess mixed AVRO files: %s and ORC files: %s in directories: %s", avroFiles, orcFiles, + inputPaths); + Preconditions + .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any AVRO or ORC file in directories: %s", + inputPaths); + + if (numAvroFiles > 0) { + LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, inputPaths); + return new SparkAvroDataPreprocessingHelper(new AvroDataPreprocessingHelper(avroFiles, outputPath)); + } else { + LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, inputPaths); + return new SparkOrcDataPreprocessingHelper(new OrcDataPreprocessingHelper(orcFiles, outputPath)); + } + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java new file mode 100644 index 0000000..f8340ed --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java @@ -0,0 +1,16 @@ +package org.apache.pinot.spark.jobs.preprocess; + +import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper; +import org.apache.spark.sql.SparkSession; + + +public class SparkOrcDataPreprocessingHelper extends SparkDataPreprocessingHelper { + public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper dataPreprocessingHelper) { + super(dataPreprocessingHelper); + } + + @Override + public void setUpAndExecuteJob(SparkSession sparkSession) { + + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java rename to pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java index 0596259..8ee89cf 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.hadoop.utils.preprocess; +package org.apache.pinot.spark.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
