This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7585656 [CARBONDATA-4096] SDK read fails from cluster and sdk read
filter query on sort column giving wrong result with IndexServer
7585656 is described below
commit 7585656b565d7eb76802fafcc84a28daba89b25e
Author: ShreelekhyaG <[email protected]>
AuthorDate: Tue Dec 22 18:44:43 2020 +0530
[CARBONDATA-4096] SDK read fails from cluster and sdk read filter query on
sort column giving wrong result with IndexServer
Why is this PR needed?
1. Create a table and read from sdk written files fails in cluster with
java.nio.file.NoSuchFileException:
hdfs:/hacluster/user/hive/warehouse/carbon.store/default/sdk.
2. After fixing the above path issue, filter query on sort column gives
the wrong result with IndexServer.
What changes were proposed in this PR?
1. In getAllDeleteDeltaFiles , used CarbonFiles.listFiles instead of
Files.walk
to handle custom file types.
2. In PruneWithFilter , isResolvedOnSegment is used in filterResolver step.
Have set table and expression on executor side, so indexserver can use this
in filterResolver step.
This closes #4064
---
.../carbondata/core/index/IndexInputFormat.java | 8 ++++++--
.../hadoop/api/CarbonFileInputFormat.java | 23 ++++++++++++----------
2 files changed, 19 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
index dbb5b4f..072dbbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexInputFormat.java
@@ -154,8 +154,12 @@ public class IndexInputFormat extends
FileInputFormat<Void, ExtendedBlocklet>
if (indexLevel == null) {
TableIndex defaultIndex = IndexStoreManager.getInstance()
.getIndex(table,
distributable.getDistributable().getIndexSchema());
- blocklets = defaultIndex
- .prune(segmentsToLoad, new IndexFilter(filterResolverIntf),
partitions);
+ IndexFilter filter = new IndexFilter(filterResolverIntf);
+ filter.setTable(table);
+ if (filterResolverIntf != null) {
+ filter.setExpression(filterResolverIntf.getFilterExpression());
+ }
+ blocklets = defaultIndex.prune(segmentsToLoad, filter, partitions);
blocklets = IndexUtil
.pruneIndexes(table, filterResolverIntf, segmentsToLoad,
partitions, blocklets,
indexChooser);
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 2a655e8..91116b4 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -20,21 +20,17 @@ package org.apache.carbondata.hadoop.api;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexFilter;
import org.apache.carbondata.core.index.Segment;
@@ -265,11 +261,18 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
}
private List<String> getAllDeleteDeltaFiles(String path) {
- List<String> deltaFiles = null;
- try (Stream<Path> walk = Files.walk(Paths.get(path))) {
- deltaFiles = walk.map(x -> x.toString())
- .filter(f -> f.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
- .collect(Collectors.toList());
+ List<String> deltaFiles = new ArrayList<>();
+ try {
+ FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter() {
+ @Override
+ public boolean accept(CarbonFile file) {
+ if
(file.getName().endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
+ deltaFiles.add(file.getAbsolutePath());
+ return true;
+ }
+ return false;
+ }
+ });
} catch (IOException e) {
throw new RuntimeException(e);
}