This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 18efd84 [CARBONDATA-3879] Filtering Segments Optimazation
18efd84 is described below
commit 18efd84184773e8734eb8b83b1e065d5a14126c6
Author: haomarch <[email protected]>
AuthorDate: Mon Jun 29 21:53:24 2020 +0800
[CARBONDATA-3879] Filtering Segments Optimazation
Why is this PR needed?
During filter segments flow, there are a lot of LIST.CONTAINS, which has
heavy time overhead when there are tens of thousands segments.
For example, if there are 50000 segments. it will trigger LIST.CONTAINS for
each segment, the LIST also has about 50000 elements. so the time complexity
will be O(50000 * 50000 )
What changes were proposed in this PR?
Change List.CONTAINS to MAP.containsKEY
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3816
---
.../apache/carbondata/core/index/TableIndex.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 61 +++++++++-------------
2 files changed, 28 insertions(+), 36 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index a76b533..7aa5645 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -206,7 +206,8 @@ public final class TableIndex extends
OperationEventListener {
Set<Path> partitionLocations, List<ExtendedBlocklet> blocklets,
Map<Segment, List<Index>> indexes) throws IOException {
for (Segment segment : segments) {
- if (indexes.get(segment).isEmpty() || indexes.get(segment) == null) {
+ if (segment == null ||
+ indexes.get(segment) == null || indexes.get(segment).isEmpty()) {
continue;
}
boolean isExternalSegment = segment.getSegmentPath() != null;
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bca03f8..2d06222 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.exceptions.DeprecatedFeatureException;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -64,6 +65,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
+import com.google.common.collect.Sets;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -232,47 +234,36 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
private List<Segment> getFilteredSegment(JobContext job, List<Segment>
validSegments,
boolean validationRequired, ReadCommittedScope readCommittedScope) {
Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
- List<Segment> segmentToAccessSet =
- new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
- List<Segment> filteredSegmentToAccess = new ArrayList<>();
if (segmentsToAccess.length == 0 ||
segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
- filteredSegmentToAccess.addAll(validSegments);
- } else {
- for (Segment validSegment : validSegments) {
- int index = segmentToAccessSet.indexOf(validSegment);
- if (index > -1) {
- // In case of in progress reading segment, segment file name is set
to the property itself
- if (segmentToAccessSet.get(index).getSegmentFileName() != null
- && validSegment.getSegmentFileName() == null) {
- filteredSegmentToAccess.add(segmentToAccessSet.get(index));
- } else {
- filteredSegmentToAccess.add(validSegment);
- }
- }
- }
- if (filteredSegmentToAccess.size() != segmentToAccessSet.size() &&
!validationRequired) {
- for (Segment segment : segmentToAccessSet) {
- if (!filteredSegmentToAccess.contains(segment)) {
- filteredSegmentToAccess.add(segment);
- }
+ return validSegments;
+ }
+ Map<String, Segment> segmentToAccessMap = Arrays.stream(segmentsToAccess)
+ .collect(Collectors.toMap(Segment::getSegmentNo, segment -> segment,
(e1, e2) -> e1));
+ Map<String, Segment> filteredSegmentToAccess = new
HashMap<>(segmentToAccessMap.size());
+ for (Segment validSegment : validSegments) {
+ String segmentNoOfValidSegment = validSegment.getSegmentNo();
+ if (segmentToAccessMap.containsKey(segmentNoOfValidSegment)) {
+ Segment segmentToAccess =
segmentToAccessMap.get(segmentNoOfValidSegment);
+ if (segmentToAccess.getSegmentFileName() != null &&
+ validSegment.getSegmentFileName() == null) {
+ validSegment = segmentToAccess;
}
+ filteredSegmentToAccess.put(segmentNoOfValidSegment, validSegment);
}
- // TODO: add validation for set segments access based on valid segments
in table status
- if (filteredSegmentToAccess.size() != segmentToAccessSet.size() &&
!validationRequired) {
- for (Segment segment : segmentToAccessSet) {
- if (!filteredSegmentToAccess.contains(segment)) {
- filteredSegmentToAccess.add(segment);
- }
+ }
+ if (!validationRequired && filteredSegmentToAccess.size() !=
segmentToAccessMap.size()) {
+ for (Segment segment : segmentToAccessMap.values()) {
+ if (!filteredSegmentToAccess.containsKey(segment.getSegmentNo())) {
+ filteredSegmentToAccess.put(segment.getSegmentNo(), segment);
}
}
- if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
- List<Segment> filteredSegmentToAccessTemp = new
ArrayList<>(filteredSegmentToAccess);
- filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
- LOG.info(
- "Segments ignored are : " +
Arrays.toString(filteredSegmentToAccessTemp.toArray()));
- }
}
- return filteredSegmentToAccess;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Segments ignored are : " +
+ Arrays.toString(Sets.difference(new
HashSet<>(filteredSegmentToAccess.values()),
+ new HashSet<>(segmentToAccessMap.values())).toArray()));
+ }
+ return new ArrayList<>(filteredSegmentToAccess.values());
}
public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment>
streamSegments,