This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 7442c29 [CARBONDATA-3659] Fix issues with alluxio without host and port 7442c29 is described below commit 7442c293f75995e6bcc8390e82ac44a9d51d08b6 Author: ravipesala <ravi.pes...@gmail.com> AuthorDate: Mon Dec 30 11:37:16 2019 +0800 [CARBONDATA-3659] Fix issues with alluxio without host and port Why is this PR needed? When alluxio path is provided without host and port like alluxio:///user/warehouse then carbon cannot read or write data because of path comparison fails and extracting parent path fails. What changes were proposed in this PR? Use Path object to compare paths. And use string utils to extract the parent path. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3571 --- .../core/datastore/block/TableBlockInfo.java | 2 +- .../filesystem/AbstractDFSCarbonFile.java | 24 ++++++++++++++++++++++ .../datastore/filesystem/AlluxioCarbonFile.java | 13 ++++++++++++ .../core/datastore/filesystem/LocalCarbonFile.java | 18 ++++++++++++++++ .../core/datastore/impl/FileFactory.java | 12 +++++++++++ .../indexstore/blockletindex/BlockDataMap.java | 9 +++++--- .../blockletindex/BlockletDataMapFactory.java | 19 ++++++++--------- .../blockletindex/SegmentIndexFileStore.java | 4 +--- .../carbondata/core/util/BlockletDataMapUtil.java | 9 ++++---- .../carbondata/core/util/path/CarbonTablePath.java | 3 ++- .../core/writer/CarbonIndexFileMergeWriter.java | 2 +- .../indexserver/DistributedRDDUtils.scala | 1 - 12 files changed, 91 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index c18534c..15b2c50 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -306,7 +306,7 @@ public class TableBlockInfo implements Distributable, Serializable { return false; } - if (!filePath.equals(other.filePath)) { + if (!new Path(filePath).equals(new Path(other.filePath))) { return false; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 51c4337..f7bc553 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -555,6 +556,29 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { } @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractDFSCarbonFile that = (AbstractDFSCarbonFile) o; + if (path == null || that.path == null) { + return false; + } + return path.equals(that.path); + } + + @Override + public int hashCode() { + if (path == null) { + return 0; + } + return Objects.hash(path); + } + + @Override public long getLength() throws IOException { return fileSystem.getFileStatus(path).getLen(); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java index 03ccd92..1a748af 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java @@ -58,6 +58,19 @@ public class AlluxioCarbonFile extends HDFSCarbonFile { } @Override + public String getAbsolutePath() { + String absolutePath = super.getAbsolutePath(); + return getFormattedPath(absolutePath); + } + + public static String getFormattedPath(String absolutePath) { + if (absolutePath.startsWith("alluxio:/") && absolutePath.charAt(9) != '/') { + return absolutePath.replace("alluxio:/", "alluxio:///"); + } + return absolutePath; + } + + @Override public boolean renameForce(String changetoName) { try { if (fileSystem instanceof DistributedFileSystem) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index 1bb8e03..da284c1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -32,6 +32,7 @@ import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -466,4 +467,21 @@ public class LocalCarbonFile implements CarbonFile { public long getLength() { return file.length(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LocalCarbonFile that = (LocalCarbonFile) o; + return Objects.equals(file, that.file); + } + + @Override + public int hashCode() { + return Objects.hash(file); + } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index d1d8c33..7b6aa34 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -36,6 +36,7 @@ import java.util.stream.Collectors; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; @@ -164,6 +165,17 @@ public final class FileFactory { return fileFileTypeInterface.getCarbonFile(path, getConfiguration()); } + /** + * Need carbonfile object path because depends on file format implementation + * path will be formatted. + */ + public static String getFormattedPath(String path) { + if (getFileType(path) == FileType.ALLUXIO) { + return AlluxioCarbonFile.getFormattedPath(path); + } + return path; + } + public static CarbonFile getCarbonFile(String path, Configuration hadoopConf) { return fileFileTypeInterface.getCarbonFile(path, hadoopConf); diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 6a4405e..be29e63 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -68,6 +68,7 @@ import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; @@ -127,7 +128,7 @@ public class BlockDataMap extends CoarseGrainDataMap // when index info is already read and converted to data file footer object indexInfo = blockletDataMapInfo.getIndexInfos(); } - Path path = new Path(blockletDataMapInfo.getFilePath()); + String path = blockletDataMapInfo.getFilePath(); // store file path only in case of partition table, non transactional table and flat folder // structure byte[] filePath; @@ -137,12 +138,14 @@ public class BlockDataMap extends CoarseGrainDataMap // if the segment data is written in tablepath then no need to store whole path of file. !blockletDataMapInfo.getFilePath().startsWith( blockletDataMapInfo.getCarbonTable().getTablePath())) { - filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + filePath = FilenameUtils.getFullPathNoEndSeparator(path) + .getBytes(CarbonCommonConstants.DEFAULT_CHARSET); isFilePathStored = true; } else { filePath = new byte[0]; } - byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + byte[] fileName = path.substring(path.lastIndexOf("/") + 1, path.length()) + .getBytes(CarbonCommonConstants.DEFAULT_CHARSET); byte[] segmentId = blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); if (!indexInfo.isEmpty()) { diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 8e8a80b..c2c3647 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -61,8 +61,6 @@ import org.apache.carbondata.core.util.BlockletDataMapUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; -import org.apache.hadoop.fs.Path; - /** * Table map for blocklet */ @@ -393,21 +391,20 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier( String indexFilePath, String segmentId) throws IOException { - Path indexPath = new Path(indexFilePath); List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>(); - if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { - String parent = indexPath.getParent().toString(); + String parent = indexFilePath.substring(0, indexFilePath.lastIndexOf("/")); + String name = + indexFilePath.substring(indexFilePath.lastIndexOf("/") + 1, indexFilePath.length()); + if (indexFilePath.endsWith(CarbonTablePath.INDEX_FILE_EXT)) { identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( - new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentId), + new TableBlockIndexUniqueIdentifier(parent, name, null, segmentId), this.getCarbonTable())); - } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + } else if (indexFilePath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString()); - String parentPath = carbonFile.getParentFile().getAbsolutePath(); - List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath()); + List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(indexFilePath); for (String indexFile : indexFiles) { identifiersWrapper.add(new TableBlockIndexUniqueIdentifierWrapper( - new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(), + new TableBlockIndexUniqueIdentifier(parent, indexFile, name, segmentId), this.getCarbonTable())); } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 73a001c..93068e8 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -305,9 +305,7 @@ public class SegmentIndexFileStore { try { dataInputStream.readFully(bytes); carbonIndexMap.put(indexFile.getName(), bytes); - carbonIndexMapWithFullPath.put( - indexFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR - + indexFile.getName(), bytes); + carbonIndexMapWithFullPath.put(indexFile.getAbsolutePath(), bytes); } finally { dataInputStream.close(); } diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index eb064ed..f214637 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -63,6 +63,7 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -189,7 +190,7 @@ public class BlockletDataMapUtil { CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile); return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize()); default: - return fileNameToMetaInfoMapping.get(carbonDataFile); + return fileNameToMetaInfoMapping.get(FileFactory.getFormattedPath(carbonDataFile)); } } @@ -198,10 +199,10 @@ public class BlockletDataMapUtil { Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(); Map<String, String> indexFiles = segment.getCommittedIndexFile(); for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) { - Path indexFile = new Path(indexFileEntry.getKey()); + String indexFile = indexFileEntry.getKey(); tableBlockIndexUniqueIdentifiers.add( - new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(), - indexFileEntry.getValue(), segment.getSegmentNo())); + new TableBlockIndexUniqueIdentifier(FilenameUtils.getFullPathNoEndSeparator(indexFile), + FilenameUtils.getName(indexFile), indexFileEntry.getValue(), segment.getSegmentNo())); } return tableBlockIndexUniqueIdentifiers; } diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 5e42c56..0be3642 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -180,7 +180,8 @@ public class CarbonTablePath { return file.getName().startsWith(SCHEMA_FILE); } }); - if (schemaFile != null && schemaFile.length > 0) { + if (schemaFile != null && schemaFile.length > 0 && + FileFactory.getFileType(tablePath) != FileFactory.FileType.ALLUXIO) { return schemaFile[0].getAbsolutePath(); } else { return metaPath + CarbonCommonConstants.FILE_SEPARATOR + SCHEMA_FILE; diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index a80699f..35f6244 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -239,7 +239,7 @@ public class CarbonIndexFileMergeWriter { location = segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; } - if (new Path(entry.getKey()).equals(new Path(location))) { + if (FileFactory.getCarbonFile(entry.getKey()).equals(FileFactory.getCarbonFile(location))) { segentry.getValue().setMergeFileName(mergeIndexFile); segentry.getValue().setFiles(new HashSet<String>()); break; diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala index bf5bc40..ebcfeea 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus} - object DistributedRDDUtils { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)