This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch inputPathCompat in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a4d2754778715e0c3182d93fe267df26c423830f Author: Jennifer Dai <[email protected]> AuthorDate: Tue Jul 23 12:52:34 2019 -0700 Compatible resizing --- .../name/NormalizedDateSegmentNameGenerator.java | 18 +- .../pinot/hadoop/job/InternalConfigConstants.java | 21 ++ .../pinot/hadoop/job/SegmentPreprocessingJob.java | 216 ++++++++++++++++++++- .../job/mappers/SegmentPreprocessingMapper.java | 41 +++- .../job/partitioners/GenericPartitioner.java | 8 +- ...mentBuildPushOfflineClusterIntegrationTest.java | 87 ++++++++- 6 files changed, 376 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java index eadd3d9..ff1ecc1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java @@ -32,16 +32,20 @@ import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat; * Segment name generator that normalizes the date to human readable format. */ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator { - private final String _segmentNamePrefix; - private final boolean _excludeSequenceId; - private final boolean _appendPushType; + private String _segmentNamePrefix; + private boolean _excludeSequenceId; + private boolean _appendPushType; // For APPEND tables - private final SimpleDateFormat _outputSDF; + private SimpleDateFormat _outputSDF; // For EPOCH time format - private final TimeUnit _inputTimeUnit; + private TimeUnit _inputTimeUnit; // For SIMPLE_DATE_FORMAT time format - private final SimpleDateFormat _inputSDF; + private SimpleDateFormat _inputSDF; + + public NormalizedDateSegmentNameGenerator(@Nullable String pushFrequency, @Nullable TimeUnit timeType, @Nullable String timeFormat) { + new NormalizedDateSegmentNameGenerator("myTable", null, false, "APPEND", pushFrequency, timeType, timeFormat); + } public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix, boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType, @@ -98,7 +102,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator * @param timeValue Time value * @return Normalized date string */ - private String getNormalizedDate(Object timeValue) { + public String getNormalizedDate(Object timeValue) { if (_inputTimeUnit != null) { return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString())))); } else { diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java new file mode 100644 index 0000000..d674742 --- /dev/null +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java @@ -0,0 +1,21 @@ +package org.apache.pinot.hadoop.job; + +/** + * Internal-only constants for Hadoop MapReduce jobs. These constants are propagated across different segment creation + * jobs. They are not meant to be set externally. + */ +public class InternalConfigConstants { + public static final String TIME_COLUMN_CONFIG = "time.column"; + public static final String TIME_COLUMN_VALUE = "time.column.value"; + public static final String IS_APPEND = "is.append"; + public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency"; + public static final String SEGMENT_TIME_TYPE = "segment.time.type"; + public static final String SEGMENT_TIME_FORMAT = "segment.time.format"; + + // Partitioning configs + public static final String PARTITION_COLUMN_CONFIG = "partition.column"; + public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; + public static final String PARTITION_FUNCTION_CONFIG = "partition.function"; + + public static final String SORTED_COLUMN_CONFIG = "sorted.column"; +} diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java index a3f3901..364ff08 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java @@ -20,6 +20,8 @@ package org.apache.pinot.hadoop.job; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -30,21 +32,43 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapred.AvroValue; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.pinot.common.config.ColumnPartitionConfig; import org.apache.pinot.common.config.IndexingConfig; import org.apache.pinot.common.config.SegmentPartitionConfig; +import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TableCustomConfig; +import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat; +import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper; +import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner; +import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer; +import org.apache.pinot.hadoop.utils.JobPreparationHelper; import org.apache.pinot.hadoop.utils.PushLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.mapreduce.MRJobConfig.*; +import static org.apache.hadoop.security.UserGroupInformation.*; +import static org.apache.pinot.hadoop.job.InternalConfigConstants.*; + /** * A Hadoop job which provides partitioning, sorting, and resizing against the input files, which is raw data in Avro format. @@ -79,6 +103,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { private final String _defaultPermissionsMask; private TableConfig _tableConfig; + private org.apache.pinot.common.data.Schema _schema; protected FileSystem _fileSystem; public SegmentPreprocessingJob(final Properties properties) { @@ -137,8 +162,195 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { public void run() throws Exception { - _logger.info("Pre-processing job is disabled."); - return; + // TODO: Remove once the job is ready + _enablePartitioning = false; + _enableSorting = false; + _enableResizing = false; + + if (!_enablePartitioning && !_enableSorting && !_enableResizing) { + _logger.info("Pre-processing job is disabled."); + return; + } else { + _logger.info("Starting {}", getClass().getSimpleName()); + } + + _fileSystem = FileSystem.get(_conf); + final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir); + + if (_fileSystem.exists(_preprocessedOutputDir)) { + _logger.warn("Found the output folder {}, deleting it", _preprocessedOutputDir); + _fileSystem.delete(_preprocessedOutputDir, true); + } + JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir, _defaultPermissionsMask); + + // If push locations, table config, and schema are not configured, this does not necessarily mean that segments + // cannot be created. We should allow the user to go to the next step rather than failing the job. + if (_pushLocations.isEmpty()) { + _logger.error("Push locations cannot be empty. " + + "They are needed to get the table config and schema needed for this step. Skipping pre-processing"); + return; + } + + try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + _tableConfig = controllerRestApi.getTableConfig(); + _schema = controllerRestApi.getSchema(); + } + + if (_tableConfig == null) { + _logger.error("Table config cannot be null. Skipping pre-processing"); + return; + } + + if (_schema == null) { + _logger.error("Schema cannot be null. Skipping pre-processing"); + } + + SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig(); + + _logger.info("Initializing a pre-processing job"); + Job job = Job.getInstance(_conf); + + // TODO: Serialize and deserialize validation config by creating toJson and fromJson + // If the use case is an append use case, check that one time unit is contained in one file. If there is more than one, + // the job should be disabled, as we should not resize for these use cases. Therefore, setting the time column name + // and value + if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) { + job.getConfiguration().set(IS_APPEND, "true"); + String timeColumnName = _schema.getTimeFieldSpec().getName(); + job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName); + job.getConfiguration().set(SEGMENT_TIME_TYPE, validationConfig.getTimeType().toString()); + job.getConfiguration().set(SEGMENT_TIME_FORMAT, _schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat()); + job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY, validationConfig.getSegmentPushFrequency()); + DataFileStream<GenericRecord> dataStreamReader = getAvroReader(inputDataPath.get(0)); + job.getConfiguration().set(TIME_COLUMN_VALUE, (String) dataStreamReader.next().get(timeColumnName)); + dataStreamReader.close(); + } + + if (_enablePartitioning) { + fetchPartitioningConfig(); + _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn); + _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions); + _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn); + } + + if (_enableSorting) { + fetchSortingConfig(); + _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn); + } + + if (_enableResizing) { + fetchResizingConfig(); + _logger.info("minimum number of output files: {}", _numberOfOutputFiles); + } + + job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName()); + // Turn this on to always firstly use class paths that user specifies. + job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, "true"); + // Turn this off since we don't need an empty file in the output directory + job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "false"); + + job.setJarByClass(SegmentPreprocessingJob.class); + + String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION); + if (hadoopTokenFileLocation != null) { + job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY, hadoopTokenFileLocation); + } + + // Schema configs. + Schema schema = getSchema(inputDataPath.get(0)); + _logger.info("Schema is: {}", schema.toString(true)); + + // Validates configs against schema. + validateConfigsAgainstSchema(schema); + + // Mapper configs. + job.setMapperClass(SegmentPreprocessingMapper.class); + job.setMapOutputKeyClass(AvroKey.class); + job.setMapOutputValueClass(AvroValue.class); + job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size()); + + // Reducer configs. + job.setReducerClass(SegmentPreprocessingReducer.class); + job.setOutputKeyClass(AvroKey.class); + job.setOutputValueClass(NullWritable.class); + + AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema); + AvroMultipleOutputs.setCountersEnabled(job, true); + // Use LazyOutputFormat to avoid creating empty files. + LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class); + + // Input and output paths. + FileInputFormat.setInputPaths(job, _inputSegmentDir); + FileOutputFormat.setOutputPath(job, _preprocessedOutputDir); + _logger.info("Total number of files to be pre-processed: {}", inputDataPath.size()); + + // Set up mapper output key + Set<Schema.Field> fieldSet = new HashSet<>(); + + // Partition configs. + int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions : inputDataPath.size(); + if (_partitionColumn != null) { + job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING, "true"); + job.setPartitionerClass(GenericPartitioner.class); + job.getConfiguration().set(PARTITION_COLUMN_CONFIG, _partitionColumn); + if (_partitionFunction != null) { + job.getConfiguration().set(PARTITION_FUNCTION_CONFIG, _partitionFunction); + } + job.getConfiguration().set(NUM_PARTITIONS_CONFIG, Integer.toString(numReduceTasks)); + } else { + if (_numberOfOutputFiles > 0) { + numReduceTasks = _numberOfOutputFiles; + } + // Partitioning is disabled. Adding hashcode as one of the fields to mapper output key. + // so that all the rows can be spread evenly. + addHashCodeField(fieldSet); + } + setMaxNumRecordsConfigIfSpecified(job); + job.setInputFormatClass(CombineAvroKeyInputFormat.class); + + _logger.info("Number of reduce tasks for pre-processing job: {}", numReduceTasks); + job.setNumReduceTasks(numReduceTasks); + + // Sort config. + if (_sortedColumn != null) { + _logger.info("Adding sorted column: {} to job config", _sortedColumn); + job.getConfiguration().set(SORTED_COLUMN_CONFIG, _sortedColumn); + + addSortedColumnField(schema, fieldSet); + } else { + // If sorting is disabled, hashcode will be the only factor for sort/group comparator. + addHashCodeField(fieldSet); + } + + // Creates a wrapper for the schema of output key in mapper. + Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record", /*doc*/"", /*namespace*/"", false); + mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet)); + _logger.info("Mapper output schema: {}", mapperOutputKeySchema); + + AvroJob.setInputKeySchema(job, schema); + AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema); + AvroJob.setMapOutputValueSchema(job, schema); + AvroJob.setOutputKeySchema(job, schema); + + // Since we aren't extending AbstractHadoopJob, we need to add the jars for the job to + // distributed cache ourselves. Take a look at how the addFilesToDistributedCache is + // implemented so that you know what it does. + _logger.info("HDFS class path: " + _pathToDependencyJar); + if (_pathToDependencyJar != null) { + _logger.info("Copying jars locally."); + JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem, job, _pathToDependencyJar); + } else { + _logger.info("Property '{}' not specified.", JobConfigConstants.PATH_TO_DEPS_JAR); + } + + long startTime = System.currentTimeMillis(); + // Submit the job for execution. + job.waitForCompletion(true); + if (!job.isSuccessful()) { + throw new RuntimeException("Job failed : " + job); + } + + _logger.info("Finished pre-processing job in {}ms", (System.currentTimeMillis() - startTime)); } @Nullable diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java index 2c4d012..508ce62 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java @@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -29,24 +30,49 @@ import org.apache.avro.mapreduce.AvroJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.hadoop.job.InternalConfigConstants.*; import static org.apache.pinot.hadoop.job.JobConfigConstants.*; public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPreprocessingMapper.class); private String _sortedColumn = null; + private String _timeColumn = null; private Schema _outputKeySchema; private Schema _outputSchema; private boolean _enablePartition = false; + private String _sampleNormalizedTimeColumnValue = null; + private NormalizedDateSegmentNameGenerator _normalizedDateSegmentNameGenerator = null; + private boolean _isAppend = false; @Override public void setup(final Context context) { Configuration configuration = context.getConfiguration(); - String sortedColumn = configuration.get("sorted.column"); + _isAppend = configuration.get(IS_APPEND).equalsIgnoreCase("true"); + + if (_isAppend) { + // Get time column name + _timeColumn = configuration.get(TIME_COLUMN_CONFIG); + + // Get sample time column value + String timeColumnValue = configuration.get(TIME_COLUMN_VALUE); + + String pushFrequency = configuration.get(SEGMENT_PUSH_FREQUENCY); + String timeType = configuration.get(SEGMENT_TIME_TYPE); + String timeFormat = configuration.get(SEGMENT_TIME_FORMAT); + TimeUnit timeUnit = TimeUnit.valueOf(timeType + ); + // Normalize time column value + _normalizedDateSegmentNameGenerator = new NormalizedDateSegmentNameGenerator(pushFrequency, timeUnit, timeFormat); + _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue); + } + + String sortedColumn = configuration.get(SORTED_COLUMN_CONFIG); // Logging the configs for the mapper LOGGER.info("Sorted Column: " + sortedColumn); if (sortedColumn != null) { @@ -61,6 +87,19 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N @Override public void map(AvroKey<GenericRecord> record, NullWritable value, final Context context) throws IOException, InterruptedException { + + if (_isAppend) { + // Normalize time column value and check against sample value + String timeColumnValue = (String) record.datum().get(_timeColumn); + String normalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue); + + if (!normalizedTimeColumnValue.equals(_sampleNormalizedTimeColumnValue)) { + // TODO: Create a custom exception and gracefully catch this exception outside, changing what the path to input + // into segment creation should be + throw new IllegalArgumentException(""); + } + } + final GenericRecord inputRecord = record.datum(); final Schema schema = inputRecord.getSchema(); Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of all avro files should be the same!"); diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java index 43d1396..ca7e312 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java @@ -27,6 +27,8 @@ import org.apache.pinot.core.data.partition.PartitionFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.hadoop.job.InternalConfigConstants.*; + public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecord>> implements Configurable { @@ -39,10 +41,10 @@ public class GenericPartitioner<T> extends Partitioner<T, AvroValue<GenericRecor @Override public void setConf(Configuration conf) { _configuration = conf; - _partitionColumn = _configuration.get("partition.column"); - _numPartitions = Integer.parseInt(_configuration.get("num.partitions")); + _partitionColumn = _configuration.get(PARTITION_COLUMN_CONFIG); + _numPartitions = Integer.parseInt(_configuration.get(NUM_PARTITIONS_CONFIG)); _partitionFunction = - PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function", null), _numPartitions); + PartitionFunctionFactory.getPartitionFunction(_configuration.get(PARTITION_FUNCTION_CONFIG, null), _numPartitions); LOGGER.info("The partition function is: " + _partitionFunction.getClass().getName()); LOGGER.info("The partition column is: " + _partitionColumn); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java index 49903ef..1bc48c3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java @@ -19,26 +19,48 @@ package org.apache.pinot.integration.tests; import java.io.File; +import java.io.IOException; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.pinot.common.config.ColumnPartitionConfig; +import org.apache.pinot.common.config.SegmentPartitionConfig; import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.partition.PartitionFunction; +import org.apache.pinot.core.data.partition.PartitionFunctionFactory; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.hadoop.job.JobConfigConstants; +import org.apache.pinot.hadoop.job.SegmentPreprocessingJob; import org.apache.pinot.hadoop.job.SegmentCreationJob; import org.apache.pinot.hadoop.job.SegmentTarPushJob; import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.pinot.hadoop.job.JobConfigConstants.*; + public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class); private static final int NUM_BROKERS = 1; private static final int NUM_SERVERS = 1; @@ -66,7 +88,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu // Start the MR Yarn cluster final Configuration conf = new Configuration(); - _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1); + _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2); _mrCluster.init(conf); _mrCluster.start(); @@ -93,7 +115,7 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu // Create the table addOfflineTable(getTableName(), _schema.getTimeColumnName(), _schema.getOutgoingTimeUnit().toString(), null, null, - getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), null, null); + getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(), getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(), getSortedColumn()); // Generate and push Pinot segments from Hadoop generateAndPushSegmentsFromHadoop(); @@ -161,6 +183,25 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS, getDefaultControllerConfiguration().getControllerHost()); properties.setProperty(JobConfigConstants.PUSH_TO_PORT, getDefaultControllerConfiguration().getControllerPort()); + Properties preComputeProperties = new Properties(); + preComputeProperties.putAll(properties); + preComputeProperties.setProperty(ENABLE_PARTITIONING, Boolean.TRUE.toString()); + preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString()); + + preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT, _avroDir.getPath()); + preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT, _preprocessingDir.getPath()); + properties.setProperty(JobConfigConstants.PATH_TO_INPUT, _preprocessingDir.getPath()); + + // Run segment pre-processing job + SegmentPreprocessingJob segmentPreprocessingJob = new SegmentPreprocessingJob(preComputeProperties); + Configuration preComputeConfig = _mrCluster.getConfig(); + segmentPreprocessingJob.setConf(preComputeConfig); + segmentPreprocessingJob.run(); + LOGGER.info("Segment preprocessing job finished."); + + // Verify partitioning and sorting. + verifyPreprocessingJob(preComputeConfig); + // Run segment creation job SegmentCreationJob creationJob = new SegmentCreationJob(properties); Configuration config = _mrCluster.getConfig(); @@ -172,4 +213,46 @@ public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu pushJob.setConf(_mrCluster.getConfig()); pushJob.run(); } + + private void verifyPreprocessingJob(Configuration preComputeConfig) throws IOException { + // Fetch partitioning config and sorting config. + SegmentPartitionConfig segmentPartitionConfig = getSegmentPartitionConfig(); + Map.Entry<String, ColumnPartitionConfig> + entry = segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next(); + String partitionColumn = entry.getKey(); + String partitionFunctionString = entry.getValue().getFunctionName(); + int numPartitions = entry.getValue().getNumPartitions(); + PartitionFunction partitionFunction = PartitionFunctionFactory.getPartitionFunction(partitionFunctionString, numPartitions); + String sortedColumn = getSortedColumn(); + + // Get output files. + FileSystem fileSystem = FileSystem.get(preComputeConfig); + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(_preprocessingDir.getPath())); + Assert.assertEquals(fileStatuses.length, numPartitions, "Number of output file should be the same as the number of partitions."); + + Set<Integer> partitionIdSet = new HashSet<>(); + Object previousObject; + for (FileStatus fileStatus : fileStatuses) { + Path avroFile = fileStatus.getPath(); + DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>()); + + // Reset hash set and previous object + partitionIdSet.clear(); + previousObject = null; + while (dataFileStream.hasNext()) { + GenericRecord genericRecord = dataFileStream.next(); + partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn))); + Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should be the same within a file."); + org.apache.avro.Schema sortedColumnSchema = genericRecord.getSchema().getField(sortedColumn).schema(); + Object currentObject = genericRecord.get(sortedColumn); + if (previousObject == null) { + previousObject = currentObject; + continue; + } + // The values of sorted column should be sorted in ascending order. + Assert.assertTrue(GenericData.get().compare(previousObject, currentObject, sortedColumnSchema) <= 0); + previousObject = currentObject; + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
