This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 18efd84  [CARBONDATA-3879] Filtering Segments Optimazation
18efd84 is described below

commit 18efd84184773e8734eb8b83b1e065d5a14126c6
Author: haomarch <marchp...@126.com>
AuthorDate: Mon Jun 29 21:53:24 2020 +0800

    [CARBONDATA-3879] Filtering Segments Optimazation
    
    Why is this PR needed?
    During filter segments flow, there are a lot of LIST.CONTAINS, which has 
heavy time overhead when there are tens of thousands segments.
    For example, if there are 50000 segments. it will trigger LIST.CONTAINS for 
each segment, the LIST also has about 50000 elements. so the time complexity 
will be O(50000 * 50000 )
    
    What changes were proposed in this PR?
    Change List.CONTAINS to MAP.containsKEY
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3816
---
 .../apache/carbondata/core/index/TableIndex.java   |  3 +-
 .../hadoop/api/CarbonTableInputFormat.java         | 61 +++++++++-------------
 2 files changed, 28 insertions(+), 36 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java 
b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index a76b533..7aa5645 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -206,7 +206,8 @@ public final class TableIndex extends 
OperationEventListener {
       Set<Path> partitionLocations, List<ExtendedBlocklet> blocklets,
       Map<Segment, List<Index>> indexes) throws IOException {
     for (Segment segment : segments) {
-      if (indexes.get(segment).isEmpty() || indexes.get(segment) == null) {
+      if (segment == null ||
+          indexes.get(segment) == null || indexes.get(segment).isEmpty()) {
         continue;
       }
       boolean isExternalSegment = segment.getSegmentPath() != null;
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bca03f8..2d06222 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.exceptions.DeprecatedFeatureException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -64,6 +65,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -232,47 +234,36 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
   private List<Segment> getFilteredSegment(JobContext job, List<Segment> 
validSegments,
       boolean validationRequired, ReadCommittedScope readCommittedScope) {
     Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
-    List<Segment> segmentToAccessSet =
-        new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
-    List<Segment> filteredSegmentToAccess = new ArrayList<>();
     if (segmentsToAccess.length == 0 || 
segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
-      filteredSegmentToAccess.addAll(validSegments);
-    } else {
-      for (Segment validSegment : validSegments) {
-        int index = segmentToAccessSet.indexOf(validSegment);
-        if (index > -1) {
-          // In case of in progress reading segment, segment file name is set 
to the property itself
-          if (segmentToAccessSet.get(index).getSegmentFileName() != null
-              && validSegment.getSegmentFileName() == null) {
-            filteredSegmentToAccess.add(segmentToAccessSet.get(index));
-          } else {
-            filteredSegmentToAccess.add(validSegment);
-          }
-        }
-      }
-      if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && 
!validationRequired) {
-        for (Segment segment : segmentToAccessSet) {
-          if (!filteredSegmentToAccess.contains(segment)) {
-            filteredSegmentToAccess.add(segment);
-          }
+      return validSegments;
+    }
+    Map<String, Segment> segmentToAccessMap = Arrays.stream(segmentsToAccess)
+        .collect(Collectors.toMap(Segment::getSegmentNo, segment -> segment, 
(e1, e2) -> e1));
+    Map<String, Segment> filteredSegmentToAccess = new 
HashMap<>(segmentToAccessMap.size());
+    for (Segment validSegment : validSegments) {
+      String segmentNoOfValidSegment = validSegment.getSegmentNo();
+      if (segmentToAccessMap.containsKey(segmentNoOfValidSegment)) {
+        Segment segmentToAccess = 
segmentToAccessMap.get(segmentNoOfValidSegment);
+        if (segmentToAccess.getSegmentFileName() != null &&
+            validSegment.getSegmentFileName() == null) {
+          validSegment = segmentToAccess;
         }
+        filteredSegmentToAccess.put(segmentNoOfValidSegment, validSegment);
       }
-      // TODO: add validation for set segments access based on valid segments 
in table status
-      if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && 
!validationRequired) {
-        for (Segment segment : segmentToAccessSet) {
-          if (!filteredSegmentToAccess.contains(segment)) {
-            filteredSegmentToAccess.add(segment);
-          }
+    }
+    if (!validationRequired && filteredSegmentToAccess.size() != 
segmentToAccessMap.size()) {
+      for (Segment segment : segmentToAccessMap.values()) {
+        if (!filteredSegmentToAccess.containsKey(segment.getSegmentNo())) {
+          filteredSegmentToAccess.put(segment.getSegmentNo(), segment);
         }
       }
-      if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
-        List<Segment> filteredSegmentToAccessTemp = new 
ArrayList<>(filteredSegmentToAccess);
-        filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
-        LOG.info(
-            "Segments ignored are : " + 
Arrays.toString(filteredSegmentToAccessTemp.toArray()));
-      }
     }
-    return filteredSegmentToAccess;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Segments ignored are : " +
+          Arrays.toString(Sets.difference(new 
HashSet<>(filteredSegmentToAccess.values()),
+          new HashSet<>(segmentToAccessMap.values())).toArray()));
+    }
+    return new ArrayList<>(filteredSegmentToAccess.values());
   }
 
   public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> 
streamSegments,

Reply via email to