This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e71622d support for local.directory.sequence.id (#5927)
e71622d is described below
commit e71622d258c27fe28ef62398b143c1ee1145b0a4
Author: Yash Agarwal <[email protected]>
AuthorDate: Fri Aug 28 02:25:43 2020 +0530
support for local.directory.sequence.id (#5927)
* support for local.directory.sequence.id
* add null check
* Fix based on review comments
---
.../batch/common/SegmentGenerationTaskRunner.java | 3 +++
.../spark/SparkSegmentGenerationJobRunner.java | 31 ++++++++++++++++++++--
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index b84a299..c84835a 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -57,6 +57,9 @@ public class SegmentGenerationTaskRunner implements
Serializable {
public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+ // Assign sequence ids to input files based at each local directory level
+ public static final String LOCAL_DIRECTORY_SEQUENCE_ID =
"local.directory.sequence.id";
+
private SegmentGenerationTaskSpec _taskSpec;
public SegmentGenerationTaskRunner(SegmentGenerationTaskSpec taskSpec) {
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index 2fb9a44..ad96e5d 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;
+import static
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner.LOCAL_DIRECTORY_SEQUENCE_ID;
import static
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_DIR;
import static
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ;
import static
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils.getFileName;
@@ -29,11 +30,14 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.file.FileSystems;
+import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
@@ -200,8 +204,31 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
}
List<String> pathAndIdxList = new ArrayList<>();
- for (int i = 0; i < filteredFiles.size(); i++) {
- pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
+ String localDirectorySequenceIdString =
_spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
+ boolean localDirectorySequenceId = false;
+ if (localDirectorySequenceIdString != null) {
+ localDirectorySequenceId =
Boolean.parseBoolean(localDirectorySequenceIdString);
+ }
+ if (localDirectorySequenceId) {
+ Map<String, List<String>> localDirIndex = new HashMap<>();
+ for (String filteredFile : filteredFiles) {
+ Path filteredParentPath = Paths.get(filteredFile).getParent();
+ if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+ localDirIndex.put(filteredParentPath.toString(), new
ArrayList<>());
+ }
+ localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
+ }
+ for (String parentPath: localDirIndex.keySet()){
+ List<String> siblingFiles = localDirIndex.get(parentPath);
+ Collections.sort(siblingFiles);
+ for (int i = 0; i < siblingFiles.size(); i++) {
+ pathAndIdxList.add(String.format("%s %d", siblingFiles.get(i), i));
+ }
+ }
+ } else {
+ for (int i = 0; i < filteredFiles.size(); i++) {
+ pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
+ }
}
JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList,
pathAndIdxList.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]