This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 551b5c3 Time-aware resizing (#4462)
551b5c3 is described below
commit 551b5c3c1d3aa6fa74667d84512bdffcceafc95d
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Jul 29 16:36:21 2019 -0700
Time-aware resizing (#4462)
* Adds a check during the pre-processing job prior to resizing. If the
table type is append and all the files in the path have the same normalized
time unit, the pre-processing job continues. Otherwise, an exception will be
thrown.
* Future work: we will throw a specific exception and determine which input
path to set for the subsequent segment creation job, whether it be the output
post-resizing or original avro files so that the job will proceed seamlessly.
* #4353
---
.../name/NormalizedDateSegmentNameGenerator.java | 18 +-
.../pinot/hadoop/job/InternalConfigConstants.java | 39 ++++
.../pinot/hadoop/job/SegmentPreprocessingJob.java | 253 ++++++++++++++++++---
.../job/mappers/SegmentPreprocessingMapper.java | 41 +++-
.../job/partitioners/GenericPartitioner.java | 8 +-
.../tests/BaseClusterIntegrationTest.java | 1 +
...mentBuildPushOfflineClusterIntegrationTest.java | 73 +++++-
7 files changed, 394 insertions(+), 39 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index eadd3d9..ff1ecc1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -32,16 +32,20 @@ import
org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
* Segment name generator that normalizes the date to human readable format.
*/
public class NormalizedDateSegmentNameGenerator implements
SegmentNameGenerator {
- private final String _segmentNamePrefix;
- private final boolean _excludeSequenceId;
- private final boolean _appendPushType;
+ private String _segmentNamePrefix;
+ private boolean _excludeSequenceId;
+ private boolean _appendPushType;
// For APPEND tables
- private final SimpleDateFormat _outputSDF;
+ private SimpleDateFormat _outputSDF;
// For EPOCH time format
- private final TimeUnit _inputTimeUnit;
+ private TimeUnit _inputTimeUnit;
// For SIMPLE_DATE_FORMAT time format
- private final SimpleDateFormat _inputSDF;
+ private SimpleDateFormat _inputSDF;
+
+ public NormalizedDateSegmentNameGenerator(@Nullable String pushFrequency,
@Nullable TimeUnit timeType, @Nullable String timeFormat) {
+ new NormalizedDateSegmentNameGenerator("myTable", null, false, "APPEND",
pushFrequency, timeType, timeFormat);
+ }
public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String
segmentNamePrefix,
boolean excludeSequenceId, @Nullable String pushType, @Nullable String
pushFrequency, @Nullable TimeUnit timeType,
@@ -98,7 +102,7 @@ public class NormalizedDateSegmentNameGenerator implements
SegmentNameGenerator
* @param timeValue Time value
* @return Normalized date string
*/
- private String getNormalizedDate(Object timeValue) {
+ public String getNormalizedDate(Object timeValue) {
if (_inputTimeUnit != null) {
return _outputSDF.format(new
Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
} else {
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
new file mode 100644
index 0000000..5c08913
--- /dev/null
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.hadoop.job;
+
+/**
+ * Internal-only constants for Hadoop MapReduce jobs. These constants are
propagated across different segment creation
+ * jobs. They are not meant to be set externally.
+ */
+public class InternalConfigConstants {
+ public static final String TIME_COLUMN_CONFIG = "time.column";
+ public static final String TIME_COLUMN_VALUE = "time.column.value";
+ public static final String IS_APPEND = "is.append";
+ public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency";
+ public static final String SEGMENT_TIME_TYPE = "segment.time.type";
+ public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
+
+ // Partitioning configs
+ public static final String PARTITION_COLUMN_CONFIG = "partition.column";
+ public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+ public static final String PARTITION_FUNCTION_CONFIG = "partition.function";
+
+ public static final String SORTED_COLUMN_CONFIG = "sorted.column";
+}
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index 65fbd73..d441ed4 100644
---
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -20,6 +20,8 @@ package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -30,22 +32,44 @@ import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.pinot.common.config.ColumnPartitionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableCustomConfig;
import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.hadoop.io.CombineAvroKeyInputFormat;
+import org.apache.pinot.hadoop.job.mappers.SegmentPreprocessingMapper;
+import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
+import org.apache.pinot.hadoop.job.reducers.SegmentPreprocessingReducer;
+import org.apache.pinot.hadoop.utils.JobPreparationHelper;
import org.apache.pinot.hadoop.utils.PushLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.mapreduce.MRJobConfig.*;
+import static org.apache.hadoop.security.UserGroupInformation.*;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
/**
* A Hadoop job which provides partitioning, sorting, and resizing against the
input files, which is raw data in Avro format.
@@ -74,12 +98,14 @@ public class SegmentPreprocessingJob extends BaseSegmentJob
{
private final Path _preprocessedOutputDir;
protected final String _rawTableName;
protected final List<PushLocation> _pushLocations;
+ private DataFileStream<GenericRecord> _dataStreamReader;
// Optional.
private final Path _pathToDependencyJar;
private final String _defaultPermissionsMask;
private TableConfig _tableConfig;
+ private org.apache.pinot.common.data.Schema _pinotTableSchema;
protected FileSystem _fileSystem;
public SegmentPreprocessingJob(final Properties properties) {
@@ -138,8 +164,195 @@ public class SegmentPreprocessingJob extends
BaseSegmentJob {
public void run()
throws Exception {
- _logger.info("Pre-processing job is disabled.");
- return;
+ // TODO: Remove once the job is ready
+ _enablePartitioning = false;
+ _enableSorting = false;
+ _enableResizing = false;
+
+ if (!_enablePartitioning && !_enableSorting && !_enableResizing) {
+ _logger.info("Pre-processing job is disabled.");
+ return;
+ } else {
+ _logger.info("Starting {}", getClass().getSimpleName());
+ }
+
+ _fileSystem = FileSystem.get(_conf);
+ final List<Path> inputDataPath = getDataFilePaths(_inputSegmentDir);
+
+ if (_fileSystem.exists(_preprocessedOutputDir)) {
+ _logger.warn("Found the output folder {}, deleting it",
_preprocessedOutputDir);
+ _fileSystem.delete(_preprocessedOutputDir, true);
+ }
+ JobPreparationHelper.setDirPermission(_fileSystem, _preprocessedOutputDir,
_defaultPermissionsMask);
+
+ // If push locations, table config, and schema are not configured, this
does not necessarily mean that segments
+ // cannot be created. We should allow the user to go to the next step
rather than failing the job.
+ if (_pushLocations.isEmpty()) {
+ _logger.error("Push locations cannot be empty. "
+ + "They are needed to get the table config and schema needed for
this step. Skipping pre-processing");
+ return;
+ }
+
+ try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
+ _tableConfig = controllerRestApi.getTableConfig();
+ _pinotTableSchema = controllerRestApi.getSchema();
+ }
+
+ if (_tableConfig == null) {
+ _logger.error("Table config cannot be null. Skipping pre-processing");
+ return;
+ }
+
+ if (_pinotTableSchema == null) {
+ _logger.error("Schema cannot be null. Skipping pre-processing");
+ }
+
+ SegmentsValidationAndRetentionConfig validationConfig =
_tableConfig.getValidationConfig();
+
+ _logger.info("Initializing a pre-processing job");
+ Job job = Job.getInstance(_conf);
+
+ // TODO: Serialize and deserialize validation config by creating toJson
and fromJson
+ // If the use case is an append use case, check that one time unit is
contained in one file. If there is more than one,
+ // the job should be disabled, as we should not resize for these use
cases. Therefore, setting the time column name
+ // and value
+ if (validationConfig.getSegmentPushType().equalsIgnoreCase("APPEND")) {
+ job.getConfiguration().set(IS_APPEND, "true");
+ String timeColumnName = _pinotTableSchema.getTimeFieldSpec().getName();
+ job.getConfiguration().set(TIME_COLUMN_CONFIG, timeColumnName);
+ job.getConfiguration().set(SEGMENT_TIME_TYPE,
validationConfig.getTimeType().toString());
+ job.getConfiguration().set(SEGMENT_TIME_FORMAT,
_pinotTableSchema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat());
+ job.getConfiguration().set(SEGMENT_PUSH_FREQUENCY,
validationConfig.getSegmentPushFrequency());
+ _dataStreamReader = getAvroReader(inputDataPath.get(0));
+ job.getConfiguration().set(TIME_COLUMN_VALUE, (String)
_dataStreamReader.next().get(timeColumnName));
+ }
+
+ if (_enablePartitioning) {
+ fetchPartitioningConfig();
+ _logger.info("{}: {}", PARTITION_COLUMN_CONFIG, _partitionColumn);
+ _logger.info("{}: {}", NUM_PARTITIONS_CONFIG, _numberOfPartitions);
+ _logger.info("{}: {}", PARTITION_FUNCTION_CONFIG, _partitionColumn);
+ }
+
+ if (_enableSorting) {
+ fetchSortingConfig();
+ _logger.info("{}: {}", SORTED_COLUMN_CONFIG, _sortedColumn);
+ }
+
+ if (_enableResizing) {
+ fetchResizingConfig();
+ _logger.info("minimum number of output files: {}", _numberOfOutputFiles);
+ }
+
+ job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+ // Turn this on to always firstly use class paths that user specifies.
+ job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST,
"true");
+ // Turn this off since we don't need an empty file in the output directory
+
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
"false");
+
+ job.setJarByClass(SegmentPreprocessingJob.class);
+
+ String hadoopTokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (hadoopTokenFileLocation != null) {
+ job.getConfiguration().set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
hadoopTokenFileLocation);
+ }
+
+ // Avro Schema configs.
+ Schema avroSchema = getSchema(inputDataPath.get(0));
+ _logger.info("Schema is: {}", avroSchema.toString(true));
+
+ // Validates configs against schema.
+ validateConfigsAgainstSchema(avroSchema);
+
+ // Mapper configs.
+ job.setMapperClass(SegmentPreprocessingMapper.class);
+ job.setMapOutputKeyClass(AvroKey.class);
+ job.setMapOutputValueClass(AvroValue.class);
+ job.getConfiguration().setInt(JobContext.NUM_MAPS, inputDataPath.size());
+
+ // Reducer configs.
+ job.setReducerClass(SegmentPreprocessingReducer.class);
+ job.setOutputKeyClass(AvroKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class,
avroSchema);
+ AvroMultipleOutputs.setCountersEnabled(job, true);
+ // Use LazyOutputFormat to avoid creating empty files.
+ LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+
+ // Input and output paths.
+ FileInputFormat.setInputPaths(job, _inputSegmentDir);
+ FileOutputFormat.setOutputPath(job, _preprocessedOutputDir);
+ _logger.info("Total number of files to be pre-processed: {}",
inputDataPath.size());
+
+ // Set up mapper output key
+ Set<Schema.Field> fieldSet = new HashSet<>();
+
+ // Partition configs.
+ int numReduceTasks = (_numberOfPartitions != 0) ? _numberOfPartitions :
inputDataPath.size();
+ if (_partitionColumn != null) {
+ job.getConfiguration().set(JobConfigConstants.ENABLE_PARTITIONING,
"true");
+ job.setPartitionerClass(GenericPartitioner.class);
+ job.getConfiguration().set(PARTITION_COLUMN_CONFIG, _partitionColumn);
+ if (_partitionFunction != null) {
+ job.getConfiguration().set(PARTITION_FUNCTION_CONFIG,
_partitionFunction);
+ }
+ job.getConfiguration().set(NUM_PARTITIONS_CONFIG,
Integer.toString(numReduceTasks));
+ } else {
+ if (_numberOfOutputFiles > 0) {
+ numReduceTasks = _numberOfOutputFiles;
+ }
+ // Partitioning is disabled. Adding hashcode as one of the fields to
mapper output key.
+ // so that all the rows can be spread evenly.
+ addHashCodeField(fieldSet);
+ }
+ setMaxNumRecordsConfigIfSpecified(job);
+ job.setInputFormatClass(CombineAvroKeyInputFormat.class);
+
+ _logger.info("Number of reduce tasks for pre-processing job: {}",
numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
+
+ // Sort config.
+ if (_sortedColumn != null) {
+ _logger.info("Adding sorted column: {} to job config", _sortedColumn);
+ job.getConfiguration().set(SORTED_COLUMN_CONFIG, _sortedColumn);
+
+ addSortedColumnField(avroSchema, fieldSet);
+ } else {
+ // If sorting is disabled, hashcode will be the only factor for
sort/group comparator.
+ addHashCodeField(fieldSet);
+ }
+
+ // Creates a wrapper for the schema of output key in mapper.
+ Schema mapperOutputKeySchema = Schema.createRecord(/*name*/"record",
/*doc*/"", /*namespace*/"", false);
+ mapperOutputKeySchema.setFields(new ArrayList<>(fieldSet));
+ _logger.info("Mapper output schema: {}", mapperOutputKeySchema);
+
+ AvroJob.setInputKeySchema(job, avroSchema);
+ AvroJob.setMapOutputKeySchema(job, mapperOutputKeySchema);
+ AvroJob.setMapOutputValueSchema(job, avroSchema);
+ AvroJob.setOutputKeySchema(job, avroSchema);
+
+ // Since we aren't extending AbstractHadoopJob, we need to add the jars
for the job to
+ // distributed cache ourselves. Take a look at how the
addFilesToDistributedCache is
+ // implemented so that you know what it does.
+ _logger.info("HDFS class path: " + _pathToDependencyJar);
+ if (_pathToDependencyJar != null) {
+ _logger.info("Copying jars locally.");
+ JobPreparationHelper.addDepsJarToDistributedCacheHelper(_fileSystem,
job, _pathToDependencyJar);
+ } else {
+ _logger.info("Property '{}' not specified.",
JobConfigConstants.PATH_TO_DEPS_JAR);
+ }
+
+ long startTime = System.currentTimeMillis();
+ // Submit the job for execution.
+ job.waitForCompletion(true);
+ if (!job.isSuccessful()) {
+ throw new RuntimeException("Job failed : " + job);
+ }
+
+ cleanup();
+ _logger.info("Finished pre-processing job in {}ms",
(System.currentTimeMillis() - startTime));
}
@Nullable
@@ -219,14 +432,14 @@ public class SegmentPreprocessingJob extends
BaseSegmentJob {
* @return Input schema
* @throws IOException exception when accessing to IO
*/
- private static Schema getSchema(Path inputPathDir)
+ private Schema getSchema(Path inputPathDir)
throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Schema avroSchema = null;
for (FileStatus fileStatus : fs.listStatus(inputPathDir)) {
if (fileStatus.isFile() &&
fileStatus.getPath().getName().endsWith(".avro")) {
_logger.info("Extracting schema from " + fileStatus.getPath());
- avroSchema = extractSchemaFromAvro(fileStatus.getPath());
+ avroSchema = _dataStreamReader.getSchema();
break;
}
}
@@ -234,20 +447,6 @@ public class SegmentPreprocessingJob extends
BaseSegmentJob {
}
/**
- * Extracts avro schema from avro file
- * @param avroFile Input avro file
- * @return Schema in avro file
- * @throws IOException exception when accessing to IO
- */
- private static Schema extractSchemaFromAvro(Path avroFile)
- throws IOException {
- DataFileStream<GenericRecord> dataStreamReader = getAvroReader(avroFile);
- Schema avroSchema = dataStreamReader.getSchema();
- dataStreamReader.close();
- return avroSchema;
- }
-
- /**
* Helper method that returns avro reader for the given avro file.
* If file name ends in 'gz' then returns the GZIP version, otherwise gives
the regular reader.
*
@@ -255,7 +454,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob
{
* @return Avro reader for the file.
* @throws IOException exception when accessing to IO
*/
- private static DataFileStream<GenericRecord> getAvroReader(Path avroFile)
+ private DataFileStream<GenericRecord> getAvroReader(Path avroFile)
throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
if (avroFile.getName().endsWith("gz")) {
@@ -312,12 +511,14 @@ public class SegmentPreprocessingJob extends
BaseSegmentJob {
return fileName.endsWith(".avro");
}
- void cleanup() {
- _logger.info("Clean up pre-processing output path: {}",
_preprocessedOutputDir);
- try {
- _fileSystem.delete(_preprocessedOutputDir, true);
- } catch (IOException e) {
- _logger.error("Failed to clean up pre-processing output path: {}",
_preprocessedOutputDir);
- }
+ void cleanup() throws IOException {
+ _dataStreamReader.close();
+ // TODO: Move this to post-segment creation
+// _logger.info("Clean up pre-processing output path: {}",
_preprocessedOutputDir);
+// try {
+// _fileSystem.delete(_preprocessedOutputDir, true);
+// } catch (IOException e) {
+// _logger.error("Failed to clean up pre-processing output path: {}",
_preprocessedOutputDir);
+// }
}
}
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index 2c4d012..31d6f8e 100644
---
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -20,6 +20,7 @@ package org.apache.pinot.hadoop.job.mappers;
import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -29,24 +30,48 @@ import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>,
NullWritable, AvroKey<GenericRecord>, AvroValue<GenericRecord>> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreprocessingMapper.class);
private String _sortedColumn = null;
+ private String _timeColumn = null;
private Schema _outputKeySchema;
private Schema _outputSchema;
private boolean _enablePartition = false;
+ private String _sampleNormalizedTimeColumnValue = null;
+ private NormalizedDateSegmentNameGenerator
_normalizedDateSegmentNameGenerator = null;
+ private boolean _isAppend = false;
@Override
public void setup(final Context context) {
Configuration configuration = context.getConfiguration();
- String sortedColumn = configuration.get("sorted.column");
+ _isAppend = configuration.get(IS_APPEND).equalsIgnoreCase("true");
+
+ if (_isAppend) {
+ // Get time column name
+ _timeColumn = configuration.get(TIME_COLUMN_CONFIG);
+
+ // Get sample time column value
+ String timeColumnValue = configuration.get(TIME_COLUMN_VALUE);
+
+ String pushFrequency = configuration.get(SEGMENT_PUSH_FREQUENCY);
+ String timeType = configuration.get(SEGMENT_TIME_TYPE);
+ String timeFormat = configuration.get(SEGMENT_TIME_FORMAT);
+ TimeUnit timeUnit = TimeUnit.valueOf(timeType);
+ // Normalize time column value
+ _normalizedDateSegmentNameGenerator = new
NormalizedDateSegmentNameGenerator(pushFrequency, timeUnit, timeFormat);
+ _sampleNormalizedTimeColumnValue =
_normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+ }
+
+ String sortedColumn = configuration.get(SORTED_COLUMN_CONFIG);
// Logging the configs for the mapper
LOGGER.info("Sorted Column: " + sortedColumn);
if (sortedColumn != null) {
@@ -61,6 +86,20 @@ public class SegmentPreprocessingMapper extends
Mapper<AvroKey<GenericRecord>, N
@Override
public void map(AvroKey<GenericRecord> record, NullWritable value, final
Context context)
throws IOException, InterruptedException {
+
+ if (_isAppend) {
+ // Normalize time column value and check against sample value
+ String timeColumnValue = (String) record.datum().get(_timeColumn);
+ String normalizedTimeColumnValue =
_normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
+
+ if (!normalizedTimeColumnValue.equals(_sampleNormalizedTimeColumnValue))
{
+ // TODO: Create a custom exception and gracefully catch this exception
outside, changing what the path to input
+ // into segment creation should be
+ throw new IllegalArgumentException("Your segment spans multiple time
units. Preprocess is not currently allowed for"
+ + " these use cases");
+ }
+ }
+
final GenericRecord inputRecord = record.datum();
final Schema schema = inputRecord.getSchema();
Preconditions.checkArgument(_outputSchema.equals(schema), "The schema of
all avro files should be the same!");
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
index 43d1396..ca7e312 100644
---
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
@@ -27,6 +27,8 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.hadoop.job.InternalConfigConstants.*;
+
public class GenericPartitioner<T> extends Partitioner<T,
AvroValue<GenericRecord>> implements Configurable {
@@ -39,10 +41,10 @@ public class GenericPartitioner<T> extends Partitioner<T,
AvroValue<GenericRecor
@Override
public void setConf(Configuration conf) {
_configuration = conf;
- _partitionColumn = _configuration.get("partition.column");
- _numPartitions = Integer.parseInt(_configuration.get("num.partitions"));
+ _partitionColumn = _configuration.get(PARTITION_COLUMN_CONFIG);
+ _numPartitions =
Integer.parseInt(_configuration.get(NUM_PARTITIONS_CONFIG));
_partitionFunction =
-
PartitionFunctionFactory.getPartitionFunction(_configuration.get("partition.function",
null), _numPartitions);
+
PartitionFunctionFactory.getPartitionFunction(_configuration.get(PARTITION_FUNCTION_CONFIG,
null), _numPartitions);
LOGGER.info("The partition function is: " +
_partitionFunction.getClass().getName());
LOGGER.info("The partition column is: " + _partitionColumn);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 42267fc..6bb7acd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -75,6 +75,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(),
getClass().getSimpleName());
protected final File _avroDir = new File(_tempDir, "avroDir");
+ protected final File _preprocessingDir = new File(_tempDir,
"preprocessingDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<StreamDataServerStartable> _kafkaStarters;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
index 49903ef..7b39366 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HadoopSegmentBuildPushOfflineClusterIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -31,14 +32,20 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.hadoop.job.JobConfigConstants;
import org.apache.pinot.hadoop.job.SegmentCreationJob;
+import org.apache.pinot.hadoop.job.SegmentPreprocessingJob;
import org.apache.pinot.hadoop.job.SegmentTarPushJob;
import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.hadoop.job.JobConfigConstants.*;
+
public class HadoopSegmentBuildPushOfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HadoopSegmentBuildPushOfflineClusterIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;
@@ -66,7 +73,7 @@ public class
HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Start the MR Yarn cluster
final Configuration conf = new Configuration();
- _mrCluster = new MiniMRYarnCluster(getClass().getName(), 1);
+ _mrCluster = new MiniMRYarnCluster(getClass().getName(), 2);
_mrCluster.init(conf);
_mrCluster.start();
@@ -93,7 +100,7 @@ public class
HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
// Create the table
addOfflineTable(getTableName(), _schema.getTimeColumnName(),
_schema.getOutgoingTimeUnit().toString(), null, null,
- getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(),
getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+ getLoadMode(), SegmentVersion.v3, getInvertedIndexColumns(),
getBloomFilterIndexColumns(), getTaskConfig(), getSegmentPartitionConfig(),
getSortedColumn());
// Generate and push Pinot segments from Hadoop
generateAndPushSegmentsFromHadoop();
@@ -161,6 +168,25 @@ public class
HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
properties.setProperty(JobConfigConstants.PUSH_TO_HOSTS,
getDefaultControllerConfiguration().getControllerHost());
properties.setProperty(JobConfigConstants.PUSH_TO_PORT,
getDefaultControllerConfiguration().getControllerPort());
+ Properties preComputeProperties = new Properties();
+ preComputeProperties.putAll(properties);
+ preComputeProperties.setProperty(ENABLE_PARTITIONING,
Boolean.TRUE.toString());
+ preComputeProperties.setProperty(ENABLE_SORTING, Boolean.TRUE.toString());
+
+ preComputeProperties.setProperty(JobConfigConstants.PATH_TO_INPUT,
_avroDir.getPath());
+
preComputeProperties.setProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT,
_preprocessingDir.getPath());
+// properties.setProperty(JobConfigConstants.PATH_TO_INPUT,
_preprocessingDir.getPath());
+
+ // Run segment pre-processing job
+ SegmentPreprocessingJob segmentPreprocessingJob = new
SegmentPreprocessingJob(preComputeProperties);
+ Configuration preComputeConfig = _mrCluster.getConfig();
+ segmentPreprocessingJob.setConf(preComputeConfig);
+ segmentPreprocessingJob.run();
+ LOGGER.info("Segment preprocessing job finished.");
+
+ // Verify partitioning and sorting.
+ verifyPreprocessingJob(preComputeConfig);
+
// Run segment creation job
SegmentCreationJob creationJob = new SegmentCreationJob(properties);
Configuration config = _mrCluster.getConfig();
@@ -172,4 +198,47 @@ public class
HadoopSegmentBuildPushOfflineClusterIntegrationTest extends BaseClu
pushJob.setConf(_mrCluster.getConfig());
pushJob.run();
}
+
+ private void verifyPreprocessingJob(Configuration preComputeConfig) throws
IOException {
+ // TODO: Uncomment once this job is released
+// // Fetch partitioning config and sorting config.
+// SegmentPartitionConfig segmentPartitionConfig =
getSegmentPartitionConfig();
+// Map.Entry<String, ColumnPartitionConfig>
+// entry =
segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+// String partitionColumn = entry.getKey();
+// String partitionFunctionString = entry.getValue().getFunctionName();
+// int numPartitions = entry.getValue().getNumPartitions();
+// PartitionFunction partitionFunction =
PartitionFunctionFactory.getPartitionFunction(partitionFunctionString,
numPartitions);
+// String sortedColumn = getSortedColumn();
+//
+// // Get output files.
+// FileSystem fileSystem = FileSystem.get(preComputeConfig);
+// FileStatus[] fileStatuses = fileSystem.listStatus(new
Path(_preprocessingDir.getPath()));
+// Assert.assertEquals(fileStatuses.length, numPartitions, "Number of
output file should be the same as the number of partitions.");
+//
+// Set<Integer> partitionIdSet = new HashSet<>();
+// Object previousObject;
+// for (FileStatus fileStatus : fileStatuses) {
+// Path avroFile = fileStatus.getPath();
+// DataFileStream<GenericRecord> dataFileStream = new
DataFileStream<>(fileSystem.open(avroFile), new GenericDatumReader<>());
+//
+// // Reset hash set and previous object
+// partitionIdSet.clear();
+// previousObject = null;
+// while (dataFileStream.hasNext()) {
+// GenericRecord genericRecord = dataFileStream.next();
+//
partitionIdSet.add(partitionFunction.getPartition(genericRecord.get(partitionColumn)));
+// Assert.assertEquals(partitionIdSet.size(), 1, "Partition Id should
be the same within a file.");
+// org.apache.avro.Schema sortedColumnSchema =
genericRecord.getSchema().getField(sortedColumn).schema();
+// Object currentObject = genericRecord.get(sortedColumn);
+// if (previousObject == null) {
+// previousObject = currentObject;
+// continue;
+// }
+// // The values of sorted column should be sorted in ascending order.
+// Assert.assertTrue(GenericData.get().compare(previousObject,
currentObject, sortedColumnSchema) <= 0);
+// previousObject = currentObject;
+// }
+// }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]