This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7585656  [CARBONDATA-4096] SDK read fails from cluster and sdk read 
filter query on sort column giving wrong result with IndexServer
7585656 is described below

commit 7585656b565d7eb76802fafcc84a28daba89b25e
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Dec 22 18:44:43 2020 +0530

    [CARBONDATA-4096] SDK read fails from cluster and sdk read filter query on
    sort column giving wrong result with IndexServer
    
    Why is this PR needed?
    1. Create a table and read from sdk written files fails in cluster with
    java.nio.file.NoSuchFileException: 
hdfs:/hacluster/user/hive/warehouse/carbon.store/default/sdk.
    2. After fixing the above path issue, filter query on sort column gives
    the wrong result with IndexServer.
    
    What changes were proposed in this PR?
    1. In getAllDeleteDeltaFiles , used CarbonFiles.listFiles instead of 
Files.walk
    to handle custom file types.
    2. In PruneWithFilter , isResolvedOnSegment is used in filterResolver step.
    Have set table and expression on executor side, so indexserver can use this
    in filterResolver step.
    
    This closes #4064
---
 .../carbondata/core/index/IndexInputFormat.java    |  8 ++++++--
 .../hadoop/api/CarbonFileInputFormat.java          | 23 ++++++++++++----------
 2 files changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java 
b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
index dbb5b4f..072dbbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
@@ -154,8 +154,12 @@ public class IndexInputFormat extends 
FileInputFormat<Void, ExtendedBlocklet>
         if (indexLevel == null) {
           TableIndex defaultIndex = IndexStoreManager.getInstance()
               .getIndex(table, 
distributable.getDistributable().getIndexSchema());
-          blocklets = defaultIndex
-              .prune(segmentsToLoad, new IndexFilter(filterResolverIntf), 
partitions);
+          IndexFilter filter = new IndexFilter(filterResolverIntf);
+          filter.setTable(table);
+          if (filterResolverIntf != null) {
+            filter.setExpression(filterResolverIntf.getFilterExpression());
+          }
+          blocklets = defaultIndex.prune(segmentsToLoad, filter, partitions);
           blocklets = IndexUtil
               .pruneIndexes(table, filterResolverIntf, segmentsToLoad, 
partitions, blocklets,
                   indexChooser);
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 2a655e8..91116b4 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -20,21 +20,17 @@ package org.apache.carbondata.hadoop.api;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
 import org.apache.carbondata.core.index.Segment;
@@ -265,11 +261,18 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
   }
 
   private List<String> getAllDeleteDeltaFiles(String path) {
-    List<String> deltaFiles = null;
-    try (Stream<Path> walk = Files.walk(Paths.get(path))) {
-      deltaFiles = walk.map(x -> x.toString())
-          .filter(f -> f.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
-          .collect(Collectors.toList());
+    List<String> deltaFiles = new ArrayList<>();
+    try {
+      FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter() {
+        @Override
+        public boolean accept(CarbonFile file) {
+          if 
(file.getName().endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+            deltaFiles.add(file.getAbsolutePath());
+            return true;
+          }
+          return false;
+        }
+      });
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Reply via email to