This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch paths in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9a3988198ab9de3fc1bf01f215a6f321a0475fb6 Author: Jennifer Dai <j...@linkedin.com> AuthorDate: Fri Sep 6 10:42:03 2019 -0700 Disallowing multiple inputs in preprocess --- .../pinot/hadoop/job/SegmentPreprocessingJob.java | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java index 942fb6d..3d07245 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java @@ -83,6 +83,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { private String _partitionFunction; private String _sortedColumn; private int _numOutputFiles; + private boolean _isMultipleInput = false; private final Path _inputSegmentDir; private final Path _preprocessedOutputDir; @@ -101,8 +102,19 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { _enablePreprocessing = Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_PREPROCESSING)); - // get input/output paths. - _inputSegmentDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT)); + String inputPath = Preconditions.checkNotNull(properties.getProperty(JobConfigConstants.PATH_TO_INPUT)); + + // We cannot support this because mapreduce takes complete control of the output path. In order to support this, we + // would need control to pipe the exact folders we receive as input to multiple outputs. While we can programmatically + // determine record by record what goes into each output path, this does not support our use case. Each folder is a + // separate "day," and frequently, our customers will have two dates in one file, due to timezone of data, so we + // are not able to distinguish what is "today's" vs. "tomorrow's" data by solely looking at the record. + if (inputPath.split(",").length > 1) { + _isMultipleInput = true; + } + + // get input path/output paths. + _inputSegmentDir = getPathFromProperty(JobConfigConstants.PATH_TO_INPUT); _preprocessedOutputDir = getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT); _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); @@ -138,6 +150,11 @@ public class SegmentPreprocessingJob extends BaseSegmentJob { _logger.info("Starting {}", getClass().getSimpleName()); } + if (_isMultipleInput) { + _logger.info("Skipping pre-processing, multiple inputs detected. Not supported"); + return; + } + _fileSystem = FileSystem.get(_conf); final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir); Preconditions.checkState(inputDataPaths.size() != 0, "No files in the input directory."); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org