This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch paths
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9a3988198ab9de3fc1bf01f215a6f321a0475fb6
Author: Jennifer Dai <j...@linkedin.com>
AuthorDate: Fri Sep 6 10:42:03 2019 -0700

    Disallowing multiple inputs in preprocess
---
 .../pinot/hadoop/job/SegmentPreprocessingJob.java   | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
index 942fb6d..3d07245 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentPreprocessingJob.java
@@ -83,6 +83,7 @@ public class SegmentPreprocessingJob extends BaseSegmentJob {
   private String _partitionFunction;
   private String _sortedColumn;
   private int _numOutputFiles;
+  private boolean _isMultipleInput = false;
 
   private final Path _inputSegmentDir;
   private final Path _preprocessedOutputDir;
@@ -101,8 +102,19 @@ public class SegmentPreprocessingJob extends 
BaseSegmentJob {
 
     _enablePreprocessing = 
Boolean.parseBoolean(_properties.getProperty(JobConfigConstants.ENABLE_PREPROCESSING));
 
-    // get input/output paths.
-    _inputSegmentDir = 
Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
+    String inputPath = 
Preconditions.checkNotNull(properties.getProperty(JobConfigConstants.PATH_TO_INPUT));
+
+    // We cannot support this because mapreduce takes complete control of the 
output path. In order to support this, we
+    // would need control to pipe the exact folders we receive as input to 
multiple outputs. While we can programmatically
+    // determine record by record what goes into each output path, this does 
not support our use case. Each folder is a
+    // separate "day," and frequently, our customers will have two dates in 
one file, due to timezone of data, so we
+    // are not able to distinguish what is "today's" vs. "tomorrow's" data by 
solely looking at the record.
+    if (inputPath.split(",").length > 1) {
+      _isMultipleInput = true;
+    }
+
+    // get input path/output paths.
+    _inputSegmentDir = getPathFromProperty(JobConfigConstants.PATH_TO_INPUT);
     _preprocessedOutputDir = 
getPathFromProperty(JobConfigConstants.PREPROCESS_PATH_TO_OUTPUT);
     _rawTableName = 
Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
 
@@ -138,6 +150,11 @@ public class SegmentPreprocessingJob extends 
BaseSegmentJob {
       _logger.info("Starting {}", getClass().getSimpleName());
     }
 
+    if (_isMultipleInput) {
+      _logger.info("Skipping pre-processing, multiple inputs detected. Not 
supported");
+      return;
+    }
+
     _fileSystem = FileSystem.get(_conf);
     final List<Path> inputDataPaths = getDataFilePaths(_inputSegmentDir);
     Preconditions.checkState(inputDataPaths.size() != 0, "No files in the 
input directory.");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to