This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch deleteExtras in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 0660d007158414bd559f039754ed6a3babd3e9a4 Author: Jennifer Dai <[email protected]> AuthorDate: Tue Jul 23 13:59:19 2019 -0700 Delete extra segments that are pushed --- .../pinot/hadoop/job/JobConfigConstants.java | 2 + .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 43 +++++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java index 19b350c..46d3d5b 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java @@ -57,4 +57,6 @@ public class JobConfigConstants { public static final String ENABLE_PARTITIONING = "enable.partitioning"; public static final String ENABLE_SORTING = "enable.sorting"; public static final String ENABLE_RESIZING = "enable.resizing"; + + public static final String DELETE_EXTRA_REFRESHED_SEGMENTS = "delete.extra.refreshed.segments"; } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java index 7c71fd8..9527b43 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java @@ -19,8 +19,12 @@ package org.apache.pinot.hadoop.job; import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,6 +34,7 @@ import org.apache.pinot.hadoop.utils.PushLocation; public class SegmentTarPushJob extends BaseSegmentJob { private final Path _segmentPattern; private final List<PushLocation> _pushLocations; + private final boolean _deleteExtraSegments; public SegmentTarPushJob(Properties properties) { super(properties); @@ -37,6 +42,7 @@ public class SegmentTarPushJob extends BaseSegmentJob { String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ','); int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); _pushLocations = PushLocation.getPushLocations(hosts, port); + _deleteExtraSegments = Boolean.parseBoolean(properties.getProperty(JobConfigConstants.DELETE_EXTRA_REFRESHED_SEGMENTS, "false")); } @Override @@ -48,10 +54,45 @@ public class SegmentTarPushJob extends BaseSegmentJob { throws Exception { FileSystem fileSystem = FileSystem.get(_conf); try (ControllerRestApi controllerRestApi = getControllerRestApi()) { - controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); + // TODO: Deal with invalid prefixes in the future + if (_deleteExtraSegments) { + List<String> allSegments = controllerRestApi.getAllSegments("OFFLINE"); + Set<String> uniqueSegmentPrefixes = new HashSet<>(); + + // Get all relevant segment prefixes that we are planning on pushing + List<Path> segmentsToPushPaths = getDataFilePaths(_segmentPattern); + List<String> segmentsToPushNames = segmentsToPushPaths.stream().map(s -> s.getName()).collect(Collectors.toList()); + for (String segmentName : segmentsToPushNames) { + String segmentNamePrefix = removeSequenceId(segmentName); + uniqueSegmentPrefixes.add(segmentNamePrefix); + } + + List<String> relevantSegments = new ArrayList<>(); + // Get relevant segments already pushed that we are planning on refreshing + for (String segmentName : allSegments) { + if (uniqueSegmentPrefixes.contains(removeSequenceId(segmentName))) { + relevantSegments.add(segmentName); + } + } + + relevantSegments.removeAll(segmentsToPushNames); + controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); + controllerRestApi.deleteSegmentUris(relevantSegments); + } else { + controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); + } } } + /** + * Remove trailing sequence id + * @param segmentName + * @return + */ + private String removeSequenceId(String segmentName) { + return segmentName.replaceAll("\\d*$", ""); + } + protected ControllerRestApi getControllerRestApi() { return new DefaultControllerRestApi(_pushLocations, null); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
