This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deleteExtras
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3eeb7fdb8516f2895afc8205ade9ccaa6044b410
Author: Jennifer Dai <[email protected]>
AuthorDate: Tue Jul 23 13:59:19 2019 -0700

    Delete extra segments that are pushed
---
 .../pinot/hadoop/job/JobConfigConstants.java       |  2 ++
 .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 34 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 19b350c..46d3d5b 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -57,4 +57,6 @@ public class JobConfigConstants {
   public static final String ENABLE_PARTITIONING = "enable.partitioning";
   public static final String ENABLE_SORTING = "enable.sorting";
   public static final String ENABLE_RESIZING = "enable.resizing";
+
+  public static final String DELETE_EXTRA_REFRESHED_SEGMENTS = 
"delete.extra.refreshed.segments";
 }
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
index 7c71fd8..1b31107 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java
@@ -19,8 +19,12 @@
 package org.apache.pinot.hadoop.job;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +34,7 @@ import org.apache.pinot.hadoop.utils.PushLocation;
 public class SegmentTarPushJob extends BaseSegmentJob {
   private final Path _segmentPattern;
   private final List<PushLocation> _pushLocations;
+  private final boolean _deleteExtraSegments;
 
   public SegmentTarPushJob(Properties properties) {
     super(properties);
@@ -37,6 +42,7 @@ public class SegmentTarPushJob extends BaseSegmentJob {
     String[] hosts = 
StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), 
',');
     int port = 
Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT));
     _pushLocations = PushLocation.getPushLocations(hosts, port);
+    _deleteExtraSegments = 
Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_REFRESHED_SEGMENTS,
 "false"));
   }
 
   @Override
@@ -48,7 +54,33 @@ public class SegmentTarPushJob extends BaseSegmentJob {
       throws Exception {
     FileSystem fileSystem = FileSystem.get(_conf);
     try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
-      controllerRestApi.pushSegments(fileSystem, 
getDataFilePaths(_segmentPattern));
+      // TODO: Deal with invalid prefixes in the future
+      if (_deleteExtraSegments) {
+        List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE");
+        Set<String> uniqueSegmentPrefixes = new HashSet<>();
+
+        // Get all relevant segment prefixes that we are planning on pushing
+        List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern);
+        List<String> segmentsToPushNames = segmentsToPushPaths.stream().map(s 
-> s.getName()).collect(Collectors.toList());
+        for (String segmentName : segmentsToPushNames) {
+          String segmentNamePrefix = 
segmentName.substring(segmentName.length() - 1);
+          uniqueSegmentPrefixes.add(segmentNamePrefix);
+        }
+
+        List<String> relevantSegments = new ArrayList<>();
+        // Get relevant segments already pushed that we are planning on 
refreshing
+        for (String segmentName : allSegments) {
+          if 
(uniqueSegmentPrefixes.contains(segmentName.substring(segmentName.length() - 
1))) {
+            relevantSegments.add(segmentName);
+          }
+        }
+
+        relevantSegments.removeAll(segmentsToPushNames);
+        controllerRestApi.pushSegments(fileSystem, 
getDataFilePaths(_segmentPattern));
+        controllerRestApi.deleteSegmentUris(relevantSegments);
+      } else {
+        controllerRestApi.pushSegments(fileSystem, 
getDataFilePaths(_segmentPattern));
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to