This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7442c29  [CARBONDATA-3659] Fix issues with alluxio without host and 
port
7442c29 is described below

commit 7442c293f75995e6bcc8390e82ac44a9d51d08b6
Author: ravipesala <ravi.pes...@gmail.com>
AuthorDate: Mon Dec 30 11:37:16 2019 +0800

    [CARBONDATA-3659] Fix issues with alluxio without host and port
    
    Why is this PR needed?
    
    When alluxio path is provided without host and port like 
alluxio:///user/warehouse then carbon cannot read or write data because of path 
comparison fails and extracting parent path fails.
    
    What changes were proposed in this PR?
    
    Use Path object to compare paths. And use string utils to extract the 
parent path.
    
    Does this PR introduce any user interface change?
    
    No
    
    Is any new testcase added?
    
    No
    
    This closes #3571
---
 .../core/datastore/block/TableBlockInfo.java       |  2 +-
 .../filesystem/AbstractDFSCarbonFile.java          | 24 ++++++++++++++++++++++
 .../datastore/filesystem/AlluxioCarbonFile.java    | 13 ++++++++++++
 .../core/datastore/filesystem/LocalCarbonFile.java | 18 ++++++++++++++++
 .../core/datastore/impl/FileFactory.java           | 12 +++++++++++
 .../indexstore/blockletindex/BlockDataMap.java     |  9 +++++---
 .../blockletindex/BlockletDataMapFactory.java      | 19 ++++++++---------
 .../blockletindex/SegmentIndexFileStore.java       |  4 +---
 .../carbondata/core/util/BlockletDataMapUtil.java  |  9 ++++----
 .../carbondata/core/util/path/CarbonTablePath.java |  3 ++-
 .../core/writer/CarbonIndexFileMergeWriter.java    |  2 +-
 .../indexserver/DistributedRDDUtils.scala          |  1 -
 12 files changed, 91 insertions(+), 25 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c18534c..15b2c50 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -306,7 +306,7 @@ public class TableBlockInfo implements Distributable, 
Serializable {
       return  false;
     }
 
-    if (!filePath.equals(other.filePath)) {
+    if (!new Path(filePath).equals(new Path(other.filePath))) {
       return false;
     }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 51c4337..f7bc553 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -555,6 +556,29 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
   }
 
   @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AbstractDFSCarbonFile that = (AbstractDFSCarbonFile) o;
+    if (path == null || that.path == null) {
+      return false;
+    }
+    return path.equals(that.path);
+  }
+
+  @Override
+  public int hashCode() {
+    if (path == null) {
+      return 0;
+    }
+    return Objects.hash(path);
+  }
+
+  @Override
   public long getLength() throws IOException {
     return fileSystem.getFileStatus(path).getLen();
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
index 03ccd92..1a748af 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
@@ -58,6 +58,19 @@ public class AlluxioCarbonFile extends HDFSCarbonFile {
   }
 
   @Override
+  public String getAbsolutePath() {
+    String absolutePath = super.getAbsolutePath();
+    return getFormattedPath(absolutePath);
+  }
+
+  public static String getFormattedPath(String absolutePath) {
+    if (absolutePath.startsWith("alluxio:/") && absolutePath.charAt(9) != '/') 
{
+      return absolutePath.replace("alluxio:/", "alluxio:///");
+    }
+    return absolutePath;
+  }
+
+  @Override
   public boolean renameForce(String changetoName) {
     try {
       if (fileSystem instanceof DistributedFileSystem) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 1bb8e03..da284c1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -32,6 +32,7 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -466,4 +467,21 @@ public class LocalCarbonFile implements CarbonFile {
   public long getLength() {
     return file.length();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LocalCarbonFile that = (LocalCarbonFile) o;
+    return Objects.equals(file, that.file);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(file);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index d1d8c33..7b6aa34 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -36,6 +36,7 @@ import java.util.stream.Collectors;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileReader;
+import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -164,6 +165,17 @@ public final class FileFactory {
     return fileFileTypeInterface.getCarbonFile(path, getConfiguration());
   }
 
+  /**
+   * Need carbonfile object path because depends on file format implementation
+   * path will be formatted.
+   */
+  public static String getFormattedPath(String path) {
+    if (getFileType(path) == FileType.ALLUXIO) {
+      return AlluxioCarbonFile.getFormattedPath(path);
+    }
+    return path;
+  }
+
   public static CarbonFile getCarbonFile(String path,
       Configuration hadoopConf) {
     return fileFileTypeInterface.getCarbonFile(path, hadoopConf);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 6a4405e..be29e63 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -68,6 +68,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
@@ -127,7 +128,7 @@ public class BlockDataMap extends CoarseGrainDataMap
       // when index info is already read and converted to data file footer 
object
       indexInfo = blockletDataMapInfo.getIndexInfos();
     }
-    Path path = new Path(blockletDataMapInfo.getFilePath());
+    String path = blockletDataMapInfo.getFilePath();
     // store file path only in case of partition table, non transactional 
table and flat folder
     // structure
     byte[] filePath;
@@ -137,12 +138,14 @@ public class BlockDataMap extends CoarseGrainDataMap
         // if the segment data is written in tablepath then no need to store 
whole path of file.
         !blockletDataMapInfo.getFilePath().startsWith(
             blockletDataMapInfo.getCarbonTable().getTablePath())) {
-      filePath = 
path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+      filePath = FilenameUtils.getFullPathNoEndSeparator(path)
+              .getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
       isFilePathStored = true;
     } else {
       filePath = new byte[0];
     }
-    byte[] fileName = 
path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+    byte[] fileName = path.substring(path.lastIndexOf("/") + 1, path.length())
+        .getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     byte[] segmentId =
         
blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     if (!indexInfo.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 8e8a80b..c2c3647 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -61,8 +61,6 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
-import org.apache.hadoop.fs.Path;
-
 /**
  * Table map for blocklet
  */
@@ -393,21 +391,20 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
 
   private List<TableBlockIndexUniqueIdentifierWrapper> 
getTableBlockIndexUniqueIdentifier(
       String indexFilePath, String segmentId) throws IOException {
-    Path indexPath = new Path(indexFilePath);
     List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new 
ArrayList<>();
-    if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-      String parent = indexPath.getParent().toString();
+    String parent = indexFilePath.substring(0, indexFilePath.lastIndexOf("/"));
+    String name =
+        indexFilePath.substring(indexFilePath.lastIndexOf("/") + 1, 
indexFilePath.length());
+    if (indexFilePath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
       identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
-          new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), 
null, segmentId),
+          new TableBlockIndexUniqueIdentifier(parent, name, null, segmentId),
           this.getCarbonTable()));
-    } else if 
(indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+    } else if (indexFilePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-      CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
-      String parentPath = carbonFile.getParentFile().getAbsolutePath();
-      List<String> indexFiles = 
fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
+      List<String> indexFiles = 
fileStore.getIndexFilesFromMergeFile(indexFilePath);
       for (String indexFile : indexFiles) {
         identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper(
-            new TableBlockIndexUniqueIdentifier(parentPath, indexFile, 
carbonFile.getName(),
+            new TableBlockIndexUniqueIdentifier(parent, indexFile, name,
                 segmentId), this.getCarbonTable()));
       }
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 73a001c..93068e8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -305,9 +305,7 @@ public class SegmentIndexFileStore {
     try {
       dataInputStream.readFully(bytes);
       carbonIndexMap.put(indexFile.getName(), bytes);
-      carbonIndexMapWithFullPath.put(
-          indexFile.getParentFile().getAbsolutePath() + 
CarbonCommonConstants.FILE_SEPARATOR
-              + indexFile.getName(), bytes);
+      carbonIndexMapWithFullPath.put(indexFile.getAbsolutePath(), bytes);
     } finally {
       dataInputStream.close();
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index eb064ed..f214637 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -63,6 +63,7 @@ import 
org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -189,7 +190,7 @@ public class BlockletDataMapUtil {
         CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
         return new BlockMetaInfo(new String[] { "localhost" }, 
carbonFile.getSize());
       default:
-        return fileNameToMetaInfoMapping.get(carbonDataFile);
+        return 
fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile));
     }
   }
 
@@ -198,10 +199,10 @@ public class BlockletDataMapUtil {
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = 
new HashSet<>();
     Map<String, String> indexFiles = segment.getCommittedIndexFile();
     for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
-      Path indexFile = new Path(indexFileEntry.getKey());
+      String indexFile = indexFileEntry.getKey();
       tableBlockIndexUniqueIdentifiers.add(
-          new 
TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), 
indexFile.getName(),
-              indexFileEntry.getValue(), segment.getSegmentNo()));
+          new 
TableBlockIndexUniqueIdentifier(FilenameUtils.getFullPathNoEndSeparator(indexFile),
+              FilenameUtils.getName(indexFile), indexFileEntry.getValue(), 
segment.getSegmentNo()));
     }
     return tableBlockIndexUniqueIdentifiers;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5e42c56..0be3642 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -180,7 +180,8 @@ public class CarbonTablePath {
         return file.getName().startsWith(SCHEMA_FILE);
       }
     });
-    if (schemaFile != null && schemaFile.length > 0) {
+    if (schemaFile != null && schemaFile.length > 0 &&
+        FileFactory.getFileType(tablePath) != FileFactory.FileType.ALLUXIO) {
       return schemaFile[0].getAbsolutePath();
     } else {
       return metaPath + CarbonCommonConstants.FILE_SEPARATOR + SCHEMA_FILE;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index a80699f..35f6244 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -239,7 +239,7 @@ public class CarbonIndexFileMergeWriter {
           location =
               segmentFileStore.getTablePath() + 
CarbonCommonConstants.FILE_SEPARATOR + location;
         }
-        if (new Path(entry.getKey()).equals(new Path(location))) {
+        if 
(FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location)))
 {
           segentry.getValue().setMergeFileName(mergeIndexFile);
           segentry.getValue().setFiles(new HashSet<String>());
           break;
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index bf5bc40..ebcfeea 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -36,7 +36,6 @@ import 
org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
 
-
 object DistributedRDDUtils {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 

Reply via email to