This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e71622d  support for local.directory.sequence.id (#5927)
e71622d is described below

commit e71622d258c27fe28ef62398b143c1ee1145b0a4
Author: Yash Agarwal <[email protected]>
AuthorDate: Fri Aug 28 02:25:43 2020 +0530

    support for local.directory.sequence.id (#5927)
    
    * support for local.directory.sequence.id
    
    * add null check
    
    * Fix based on review comments
---
 .../batch/common/SegmentGenerationTaskRunner.java  |  3 +++
 .../spark/SparkSegmentGenerationJobRunner.java     | 31 ++++++++++++++++++++--
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index b84a299..c84835a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -57,6 +57,9 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
   public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
   public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
 
+  // Assign sequence ids to input files based at each local directory level
+  public static final String LOCAL_DIRECTORY_SEQUENCE_ID = 
"local.directory.sequence.id";
+
   private SegmentGenerationTaskSpec _taskSpec;
 
   public SegmentGenerationTaskRunner(SegmentGenerationTaskSpec taskSpec) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 2fb9a44..ad96e5d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
+import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
 import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
 import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
 import static 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.getFileName;
@@ -29,11 +30,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.nio.file.FileSystems;
+import java.nio.file.Path;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
@@ -200,8 +204,31 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
       }
 
       List<String> pathAndIdxList = new ArrayList<>();
-      for (int i = 0; i < filteredFiles.size(); i++) {
-        pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
+      String localDirectorySequenceIdString = 
_spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
+      boolean localDirectorySequenceId = false;
+      if (localDirectorySequenceIdString != null) {
+        localDirectorySequenceId = 
Boolean.parseBoolean(localDirectorySequenceIdString);
+      }
+      if (localDirectorySequenceId) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new 
ArrayList<>());
+          }
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+        }
+        for (String parentPath: localDirIndex.keySet()){
+          List<String> siblingFiles = localDirIndex.get(parentPath);
+          Collections.sort(siblingFiles);
+          for (int i = 0; i < siblingFiles.size(); i++) {
+            pathAndIdxList.add(String.format("%s %d", siblingFiles.get(i), i));
+          }
+        }
+      } else {
+        for (int i = 0; i < filteredFiles.size(); i++) {
+          pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
+        }
       }
       JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, 
pathAndIdxList.size());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to