This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9640e2de44d [fix](catalog) refactor location path and support default
fs #39116 (#39203)
9640e2de44d is described below
commit 9640e2de44dd91f185358f5cf0b25ad184342903
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Aug 24 16:05:13 2024 +0800
[fix](catalog) refactor location path and support default fs #39116 (#39203)
---
.../org/apache/doris/common/util/LocationPath.java | 433 +++++++++++----------
.../apache/doris/datasource/FileQueryScanNode.java | 96 ++---
.../org/apache/doris/datasource/FileScanNode.java | 6 +-
.../org/apache/doris/datasource/FileSplit.java | 25 +-
.../org/apache/doris/datasource/SplitCreator.java | 5 +-
.../doris/datasource/hive/HiveMetaStoreCache.java | 30 +-
.../doris/datasource/hive/source/HiveScanNode.java | 20 +-
.../doris/datasource/hive/source/HiveSplit.java | 12 +-
.../hudi/source/COWIncrementalRelation.java | 11 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 13 +-
.../doris/datasource/hudi/source/HudiSplit.java | 6 +-
.../datasource/iceberg/source/IcebergScanNode.java | 48 +--
.../datasource/iceberg/source/IcebergSplit.java | 20 +-
.../maxcompute/source/MaxComputeScanNode.java | 34 +-
.../maxcompute/source/MaxComputeSplit.java | 19 +-
.../datasource/paimon/source/PaimonScanNode.java | 22 +-
.../datasource/paimon/source/PaimonSource.java | 1 -
.../datasource/paimon/source/PaimonSplit.java | 20 +-
.../doris/datasource/tvf/source/TVFScanNode.java | 23 +-
.../org/apache/doris/planner/HiveTableSink.java | 2 +-
.../org/apache/doris/planner/IcebergTableSink.java | 2 +-
.../ExternalFileTableValuedFunction.java | 18 -
.../apache/doris/common/util/LocationPathTest.java | 20 +-
.../doris/planner/FederationBackendPolicyTest.java | 77 ++--
24 files changed, 433 insertions(+), 530 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index eccb483578a..267e20a1f95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
@@ -27,7 +28,9 @@ import
org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemType;
import org.apache.doris.thrift.TFileType;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
@@ -49,10 +52,11 @@ public class LocationPath {
private static final Logger LOG = LogManager.getLogger(LocationPath.class);
private static final String SCHEME_DELIM = "://";
private static final String NONSTANDARD_SCHEME_DELIM = ":/";
- private final LocationType locationType;
+ private final Scheme scheme;
private final String location;
+ private final boolean isBindBroker;
- public enum LocationType {
+ public enum Scheme {
HDFS,
LOCAL, // Local File
BOS, // Baidu
@@ -74,122 +78,230 @@ public class LocationPath {
NOSCHEME // no scheme info
}
- private LocationPath(String location) {
- this(location, Collections.emptyMap(), true);
+ @VisibleForTesting
+ public LocationPath(String location) {
+ this(location, Maps.newHashMap(), true);
}
public LocationPath(String location, Map<String, String> props) {
this(location, props, true);
}
- public LocationPath(String location, Map<String, String> props, boolean
convertPath) {
- String scheme = parseScheme(location).toLowerCase();
- if (scheme.isEmpty()) {
- locationType = LocationType.NOSCHEME;
- this.location = location;
- } else {
- switch (scheme) {
- case FeConstants.FS_PREFIX_HDFS:
- locationType = LocationType.HDFS;
- // Need add hdfs host to location
- String host = props.get(HdfsResource.DSF_NAMESERVICES);
- this.location = convertPath ? normalizedHdfsPath(location,
host) : location;
- break;
- case FeConstants.FS_PREFIX_S3:
- locationType = LocationType.S3;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_S3A:
- locationType = LocationType.S3A;
- this.location = convertPath ? convertToS3(location) :
location;
- break;
- case FeConstants.FS_PREFIX_S3N:
- // include the check for multi locations and in a table,
such as both s3 and hdfs are in a table.
- locationType = LocationType.S3N;
- this.location = convertPath ? convertToS3(location) :
location;
- break;
- case FeConstants.FS_PREFIX_BOS:
- locationType = LocationType.BOS;
- // use s3 client to access
- this.location = convertPath ? convertToS3(location) :
location;
- break;
- case FeConstants.FS_PREFIX_GCS:
- locationType = LocationType.GCS;
- // use s3 client to access
- this.location = convertPath ? convertToS3(location) :
location;
- break;
- case FeConstants.FS_PREFIX_OSS:
- if (isHdfsOnOssEndpoint(location)) {
- locationType = LocationType.OSS_HDFS;
- this.location = location;
- } else {
- if (useS3EndPoint(props)) {
- this.location = convertPath ?
convertToS3(location) : location;
- } else {
- this.location = location;
- }
- locationType = LocationType.OSS;
- }
- break;
- case FeConstants.FS_PREFIX_COS:
- if (useS3EndPoint(props)) {
- this.location = convertPath ? convertToS3(location) :
location;
- } else {
- this.location = location;
- }
- locationType = LocationType.COS;
- break;
- case FeConstants.FS_PREFIX_OBS:
+ private LocationPath(String originLocation, Map<String, String> props,
boolean convertPath) {
+ isBindBroker = props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME);
+ String tmpLocation = originLocation;
+ if (!originLocation.contains(SCHEME_DELIM)) {
+ // Sometimes the file path does not contain scheme, need to add
default fs
+ // eg, /path/to/file.parquet -> hdfs://nn/path/to/file.parquet
+ // the default fs is from the catalog properties
+ String defaultFS = props.getOrDefault(HdfsResource.HADOOP_FS_NAME,
"");
+ tmpLocation = defaultFS + originLocation;
+ }
+ String scheme = parseScheme(tmpLocation).toLowerCase();
+ switch (scheme) {
+ case "":
+ this.scheme = Scheme.NOSCHEME;
+ break;
+ case FeConstants.FS_PREFIX_HDFS:
+ this.scheme = Scheme.HDFS;
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation,
host) : tmpLocation;
+ break;
+ case FeConstants.FS_PREFIX_S3:
+ this.scheme = Scheme.S3;
+ break;
+ case FeConstants.FS_PREFIX_S3A:
+ this.scheme = Scheme.S3A;
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ break;
+ case FeConstants.FS_PREFIX_S3N:
+ // include the check for multi locations and in a table, such
as both s3 and hdfs are in a table.
+ this.scheme = Scheme.S3N;
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ break;
+ case FeConstants.FS_PREFIX_BOS:
+ this.scheme = Scheme.BOS;
+ // use s3 client to access
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ break;
+ case FeConstants.FS_PREFIX_GCS:
+ this.scheme = Scheme.GCS;
+ // use s3 client to access
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ break;
+ case FeConstants.FS_PREFIX_OSS:
+ if (isHdfsOnOssEndpoint(tmpLocation)) {
+ this.scheme = Scheme.OSS_HDFS;
+ } else {
if (useS3EndPoint(props)) {
- this.location = convertPath ? convertToS3(location) :
location;
- } else {
- this.location = location;
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
}
- locationType = LocationType.OBS;
- break;
- case FeConstants.FS_PREFIX_OFS:
- locationType = LocationType.OFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_JFS:
- locationType = LocationType.JFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_GFS:
- locationType = LocationType.GFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_COSN:
- // if treat cosn(tencent hadoop-cos) as a s3 file system,
may bring incompatible issues
- locationType = LocationType.COSN;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_LAKEFS:
- locationType = LocationType.COSN;
- this.location = normalizedLakefsPath(location);
- break;
- case FeConstants.FS_PREFIX_VIEWFS:
- locationType = LocationType.VIEWFS;
- this.location = location;
- break;
- case FeConstants.FS_PREFIX_FILE:
- locationType = LocationType.LOCAL;
- this.location = location;
- break;
- default:
- locationType = LocationType.UNKNOWN;
- this.location = location;
- }
+ this.scheme = Scheme.OSS;
+ }
+ break;
+ case FeConstants.FS_PREFIX_COS:
+ if (useS3EndPoint(props)) {
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ }
+ this.scheme = Scheme.COS;
+ break;
+ case FeConstants.FS_PREFIX_OBS:
+ if (useS3EndPoint(props)) {
+ tmpLocation = convertPath ? convertToS3(tmpLocation) :
tmpLocation;
+ }
+ this.scheme = Scheme.OBS;
+ break;
+ case FeConstants.FS_PREFIX_OFS:
+ this.scheme = Scheme.OFS;
+ break;
+ case FeConstants.FS_PREFIX_JFS:
+ this.scheme = Scheme.JFS;
+ break;
+ case FeConstants.FS_PREFIX_GFS:
+ this.scheme = Scheme.GFS;
+ break;
+ case FeConstants.FS_PREFIX_COSN:
+ // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
+ this.scheme = Scheme.COSN;
+ break;
+ case FeConstants.FS_PREFIX_LAKEFS:
+ this.scheme = Scheme.COSN;
+ tmpLocation = normalizedLakefsPath(tmpLocation);
+ break;
+ case FeConstants.FS_PREFIX_VIEWFS:
+ this.scheme = Scheme.VIEWFS;
+ break;
+ case FeConstants.FS_PREFIX_FILE:
+ this.scheme = Scheme.LOCAL;
+ break;
+ default:
+ this.scheme = Scheme.UNKNOWN;
+ break;
+ }
+ this.location = tmpLocation;
+ }
+
+ // Return true if this location is with oss-hdfs
+ public static boolean isHdfsOnOssEndpoint(String location) {
+ // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
+ //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
+ return location.contains("oss-dls.aliyuncs");
+ }
+
+ // Return the file system type and the file system identity.
+ // The file system identity is the scheme and authority of the URI, eg.
"hdfs://host:port" or "s3://bucket".
+ public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
+ LocationPath locationPath = new LocationPath(location,
Collections.emptyMap(), true);
+ FileSystemType fsType = (bindBrokerName != null) ?
FileSystemType.BROKER : locationPath.getFileSystemType();
+ URI uri = locationPath.getPath().toUri();
+ String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
+ return Pair.of(fsType, fsIdent);
+ }
+
+ /**
+ * provide file type for BE.
+ *
+ * @param location the location is from fs.listFile
+ * @return on BE, we will use TFileType to get the suitable client to
access storage.
+ */
+ public static TFileType getTFileTypeForBE(String location) {
+ if (location == null || location.isEmpty()) {
+ return null;
+ }
+ LocationPath locationPath = new LocationPath(location,
Collections.emptyMap(), false);
+ return locationPath.getTFileTypeForBE();
+ }
+
+ public static String getTempWritePath(String loc, String prefix) {
+ Path tempRoot = new Path(loc, prefix);
+ Path tempPath = new Path(tempRoot,
UUID.randomUUID().toString().replace("-", ""));
+ return tempPath.toString();
+ }
+
+ public TFileType getTFileTypeForBE() {
+ switch (scheme) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // ATTN, for COSN, on FE side, use HadoopFS to access, but on
BE, use S3 client to access.
+ case COSN:
+ case LAKEFS:
+ // now we only support S3 client for object storage on BE
+ return TFileType.FILE_S3;
+ case HDFS:
+ case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
+ case VIEWFS:
+ return TFileType.FILE_HDFS;
+ case GFS:
+ case JFS:
+ case OFS:
+ return TFileType.FILE_BROKER;
+ case LOCAL:
+ return TFileType.FILE_LOCAL;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * The converted path is used for BE
+ *
+ * @return BE scan range path
+ */
+ public Path toStorageLocation() {
+ switch (scheme) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ case COSN:
+ // All storage will use s3 client to access on BE, so need
convert to s3
+ return new Path(convertToS3(location));
+ case HDFS:
+ case OSS_HDFS:
+ case VIEWFS:
+ case GFS:
+ case JFS:
+ case OFS:
+ case LOCAL:
+ default:
+ return getPath();
}
}
- private static String parseScheme(String location) {
+ public Scheme getScheme() {
+ return scheme;
+ }
+
+ public String get() {
+ return location;
+ }
+
+ public Path getPath() {
+ return new Path(location);
+ }
+
+ public boolean isBindBroker() {
+ return isBindBroker;
+ }
+
+ private static String parseScheme(String finalLocation) {
String scheme = "";
- String[] schemeSplit = location.split(SCHEME_DELIM);
+ String[] schemeSplit = finalLocation.split(SCHEME_DELIM);
if (schemeSplit.length > 1) {
scheme = schemeSplit[0];
} else {
- schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
+ schemeSplit = finalLocation.split(NONSTANDARD_SCHEME_DELIM);
if (schemeSplit.length > 1) {
scheme = schemeSplit[0];
}
@@ -198,9 +310,9 @@ public class LocationPath {
// if not get scheme, need consider /path/to/local to no scheme
if (scheme.isEmpty()) {
try {
- Paths.get(location);
+ Paths.get(finalLocation);
} catch (InvalidPathException exception) {
- throw new IllegalArgumentException("Fail to parse scheme,
invalid location: " + location);
+ throw new IllegalArgumentException("Fail to parse scheme,
invalid location: " + finalLocation);
}
}
@@ -217,14 +329,9 @@ public class LocationPath {
return (props.containsKey(S3Properties.ENDPOINT) ||
props.containsKey(S3Properties.Env.ENDPOINT));
}
- public static boolean isHdfsOnOssEndpoint(String location) {
- // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
- //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
- return location.contains("oss-dls.aliyuncs");
- }
-
/**
- * The converted path is used for FE to get metadata
+ * The converted path is used for FE to get metadata.
+ * Change http://xxxx to s3://xxxx
*
* @param location origin location
* @return metadata location path. just convert when storage is compatible
with s3 client.
@@ -291,17 +398,9 @@ public class LocationPath {
}
}
- public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
- LocationPath locationPath = new LocationPath(location);
- FileSystemType fsType = (bindBrokerName != null) ?
FileSystemType.BROKER : locationPath.getFileSystemType();
- URI uri = locationPath.getPath().toUri();
- String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
- return Pair.of(fsType, fsIdent);
- }
-
private FileSystemType getFileSystemType() {
FileSystemType fsType;
- switch (locationType) {
+ switch (scheme) {
case S3:
case S3A:
case S3N:
@@ -339,98 +438,6 @@ public class LocationPath {
return fsType;
}
- /**
- * provide file type for BE.
- *
- * @param location the location is from fs.listFile
- * @return on BE, we will use TFileType to get the suitable client to
access storage.
- */
- public static TFileType getTFileTypeForBE(String location) {
- if (location == null || location.isEmpty()) {
- return null;
- }
- LocationPath locationPath = new LocationPath(location,
Collections.emptyMap(), false);
- return locationPath.getTFileTypeForBE();
- }
-
- public TFileType getTFileTypeForBE() {
- switch (this.getLocationType()) {
- case S3:
- case S3A:
- case S3N:
- case COS:
- case OSS:
- case OBS:
- case BOS:
- case GCS:
- // ATTN, for COSN, on FE side, use HadoopFS to access, but on
BE, use S3 client to access.
- case COSN:
- case LAKEFS:
- // now we only support S3 client for object storage on BE
- return TFileType.FILE_S3;
- case HDFS:
- case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
- case VIEWFS:
- return TFileType.FILE_HDFS;
- case GFS:
- case JFS:
- case OFS:
- return TFileType.FILE_BROKER;
- case LOCAL:
- return TFileType.FILE_LOCAL;
- default:
- return null;
- }
- }
-
- /**
- * The converted path is used for BE
- *
- * @return BE scan range path
- */
- public Path toStorageLocation() {
- switch (locationType) {
- case S3:
- case S3A:
- case S3N:
- case COS:
- case OSS:
- case OBS:
- case BOS:
- case GCS:
- case COSN:
- // All storage will use s3 client to access on BE, so need
convert to s3
- return new Path(convertToS3(location));
- case HDFS:
- case OSS_HDFS:
- case VIEWFS:
- case GFS:
- case JFS:
- case OFS:
- case LOCAL:
- default:
- return getPath();
- }
- }
-
- public LocationType getLocationType() {
- return locationType;
- }
-
- public String get() {
- return location;
- }
-
- public Path getPath() {
- return new Path(location);
- }
-
- public static String getTempWritePath(String loc, String prefix) {
- Path tempRoot = new Path(loc, prefix);
- Path tempPath = new Path(tempRoot,
UUID.randomUUID().toString().replace("-", ""));
- return tempPath.toString();
- }
-
@Override
public String toString() {
return get();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index a5be5e56cd2..f8048b5fb66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -35,10 +35,8 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
-import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
-import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@@ -268,7 +266,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
if (isCsvOrJson || isWal) {
params.setFileAttributes(getFileAttributes());
- if (getLocationType() == TFileType.FILE_STREAM) {
+ if (isFileStreamType()) {
params.setFileType(TFileType.FILE_STREAM);
FunctionGenTable table = (FunctionGenTable)
this.desc.getTable();
ExternalFileTableValuedFunction tableValuedFunction =
(ExternalFileTableValuedFunction) table.getTvf();
@@ -309,19 +307,13 @@ public abstract class FileQueryScanNode extends
FileScanNode {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
- if (splitAssignment.getSampleSplit() == null &&
!(getLocationType() == TFileType.FILE_STREAM)) {
+ if (splitAssignment.getSampleSplit() == null &&
!isFileStreamType()) {
return;
}
selectedSplitNum = numApproximateSplits();
- TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
- if (fileSplit instanceof IcebergSplit
- && ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
- locationType = TFileType.FILE_BROKER;
- } else {
- locationType = getLocationType(fileSplit.getPath().toString());
- }
+ TFileType locationType = fileSplit.getLocationType();
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime =
ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
@@ -351,7 +343,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
selectedSplitNum = inputSplits.size();
- if (inputSplits.isEmpty() && !(getLocationType() ==
TFileType.FILE_STREAM)) {
+ if (inputSplits.isEmpty() && !isFileStreamType()) {
return;
}
Multimap<Backend, Split> assignment =
backendPolicy.computeScanRangeAssignment(inputSplits);
@@ -379,14 +371,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
Split split,
List<String> pathPartitionKeys) throws UserException {
FileSplit fileSplit = (FileSplit) split;
- TFileType locationType;
- if (fileSplit instanceof IcebergSplit
- && ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
- locationType = TFileType.FILE_BROKER;
- } else {
- locationType = getLocationType(fileSplit.getPath().toString());
- }
-
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from
hive partitions.
// Otherwise, use the values in file path.
@@ -396,41 +380,42 @@ public abstract class FileQueryScanNode extends
FileScanNode {
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues()
== null
- ?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys,
+ ? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(),
pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys,
- locationType);
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
- if (isACID) {
- HiveSplit hiveSplit = (HiveSplit) fileSplit;
- hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
- TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
-
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
- AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
- TTransactionalHiveDesc transactionalHiveDesc = new
TTransactionalHiveDesc();
-
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
- List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new
ArrayList<>();
- for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas())
{
- TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new
TTransactionalHiveDeleteDeltaDesc();
-
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
- deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
- deleteDeltaDescs.add(deleteDeltaDesc);
+ if (fileSplit instanceof HiveSplit) {
+ if (isACID) {
+ HiveSplit hiveSplit = (HiveSplit) fileSplit;
+
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+ TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+ AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+ TTransactionalHiveDesc transactionalHiveDesc = new
TTransactionalHiveDesc();
+
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+ List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new
ArrayList<>();
+ for (DeleteDeltaInfo deleteDeltaInfo :
acidInfo.getDeleteDeltas()) {
+ TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new
TTransactionalHiveDeleteDeltaDesc();
+
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+ deleteDeltaDescs.add(deleteDeltaDesc);
+ }
+ transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ } else {
+ TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
- transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
-
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
- rangeDesc.setTableFormatParams(tableFormatFileDesc);
- } else if (fileSplit instanceof HiveSplit) {
- TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
-
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
- rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
- setLocationPropertiesIfNecessary(backend, locationType,
locationProperties);
+ setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(),
locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
curLocations.addToLocations(location);
@@ -493,8 +478,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit,
List<String> columnsFromPath,
- List<String>
columnsFromPathKeys, TFileType locationType)
- throws UserException {
+ List<String>
columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
@@ -504,10 +488,10 @@ public abstract class FileQueryScanNode extends
FileScanNode {
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
- rangeDesc.setFileType(locationType);
- rangeDesc.setPath(fileSplit.getPath().toString());
- if (locationType == TFileType.FILE_HDFS) {
- URI fileUri = fileSplit.getPath().toUri();
+ rangeDesc.setFileType(fileSplit.getLocationType());
+ rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString());
+ if (fileSplit.getLocationType() == TFileType.FILE_HDFS) {
+ URI fileUri = fileSplit.getPath().getPath().toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" +
fileUri.getAuthority());
}
rangeDesc.setModificationTime(fileSplit.getModificationTime());
@@ -555,14 +539,16 @@ public abstract class FileQueryScanNode extends
FileScanNode {
return scanRangeLocations.size();
}
- protected abstract TFileType getLocationType() throws UserException;
-
- protected abstract TFileType getLocationType(String location) throws
UserException;
+ // Return true if this is a TFileType.FILE_STREAM type.
+ // Currently, only TVFScanNode may be TFileType.FILE_STREAM type.
+ protected boolean isFileStreamType() throws UserException {
+ return false;
+ }
protected abstract TFileFormatType getFileFormatType() throws
UserException;
protected TFileCompressType getFileCompressType(FileSplit fileSplit)
throws UserException {
- return
Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
+ return Util.inferFileCompressTypeByPath(fileSplit.getPathString());
}
protected TFileAttributes getFileAttributes() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index fda93c0c3fa..2868d6ebcf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
@@ -46,7 +47,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -241,14 +241,14 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
}
- protected List<Split> splitFile(Path path, long blockSize, BlockLocation[]
blockLocations, long length,
+ protected List<Split> splitFile(LocationPath path, long blockSize,
BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String>
partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
- TFileCompressType compressType =
Util.inferFileCompressTypeByPath(path.toString());
+ TFileCompressType compressType =
Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index b02e8be0cd7..7eaa87b74aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -17,16 +17,17 @@
package org.apache.doris.datasource;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TFileType;
import lombok.Data;
-import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class FileSplit implements Split {
- public Path path;
+ public LocationPath path;
public long start;
// length of this split, in bytes
public long length;
@@ -43,27 +44,30 @@ public class FileSplit implements Split {
public List<String> partitionValues;
public List<String> alternativeHosts;
+ // the location type for BE, eg: HDFS, LOCAL, S3
+ protected TFileType locationType;
- public FileSplit(Path path, long start, long length, long fileLength,
+ public FileSplit(LocationPath path, long start, long length, long
fileLength,
long modificationTime, String[] hosts, List<String>
partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
- this.modificationTime = modificationTime;
+ // BE requires modification time to be non-negative.
+ this.modificationTime = modificationTime < 0 ? 0 : modificationTime;
this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
- }
-
- public FileSplit(Path path, long start, long length, long fileLength,
- String[] hosts, List<String> partitionValues) {
- this(path, start, length, fileLength, 0, hosts, partitionValues);
+ this.locationType = path.isBindBroker() ? TFileType.FILE_BROKER :
path.getTFileTypeForBE();
}
public String[] getHosts() {
return hosts;
}
+ public TFileType getLocationType() {
+ return locationType;
+ }
+
@Override
public Object getInfo() {
return null;
@@ -79,7 +83,8 @@ public class FileSplit implements Split {
public static final FileSplitCreator DEFAULT = new FileSplitCreator();
@Override
- public Split create(Path path, long start, long length, long
fileLength, long modificationTime, String[] hosts,
+ public Split create(LocationPath path, long start, long length, long
fileLength,
+ long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
index 095a9a5eccc..4df30459db7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
@@ -17,13 +17,12 @@
package org.apache.doris.datasource;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
-import org.apache.hadoop.fs.Path;
-
import java.util.List;
public interface SplitCreator {
- Split create(Path path, long start, long length, long fileLength,
+ Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String>
partitionValues);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index ad36dc221d8..006ed83413a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -369,11 +369,7 @@ public class HiveMetaStoreCache {
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath,
catalog.getProperties());
- Path convertedPath = locationPath.toStorageLocation();
- if (!convertedPath.toString().equals(srcPath)) {
- remoteFile.setPath(convertedPath);
- }
- result.addFile(remoteFile);
+ result.addFile(remoteFile, locationPath);
}
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
@@ -813,14 +809,17 @@ public class HiveMetaStoreCache {
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames =
remoteFiles.stream().map(f -> f.getName()).filter(
- name ->
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+ name ->
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location,
deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
- .forEach(fileCacheValue::addFile);
+ f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
+ LocationPath path = new
LocationPath(file.getPath().toString(),
+ catalog.getProperties());
+ fileCacheValue.addFile(file, path);
+ });
} else {
throw new RuntimeException(status.getErrMsg());
}
@@ -837,8 +836,12 @@ public class HiveMetaStoreCache {
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
- f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
- .forEach(fileCacheValue::addFile);
+ f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+ .forEach(file -> {
+ LocationPath path = new
LocationPath(file.getPath().toString(),
+ catalog.getProperties());
+ fileCacheValue.addFile(file, path);
+ });
} else {
throw new RuntimeException(status.getErrMsg());
}
@@ -998,11 +1001,11 @@ public class HiveMetaStoreCache {
private AcidInfo acidInfo;
- public void addFile(RemoteFile file) {
+ public void addFile(RemoteFile file, LocationPath locationPath) {
if (isFileVisible(file.getPath())) {
HiveFileStatus status = new HiveFileStatus();
status.setBlockLocations(file.getBlockLocations());
- status.setPath(file.getPath());
+ status.setPath(locationPath);
status.length = file.getSize();
status.blockSize = file.getBlockSize();
status.modificationTime = file.getModificationTime();
@@ -1014,7 +1017,6 @@ public class HiveMetaStoreCache {
return partitionValues == null ? 0 : partitionValues.size();
}
-
public AcidInfo getAcidInfo() {
return acidInfo;
}
@@ -1062,7 +1064,7 @@ public class HiveMetaStoreCache {
@Data
public static class HiveFileStatus {
BlockLocation[] blockLocations;
- Path path;
+ LocationPath path;
long length;
long blockSize;
long modificationTime;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index be722b31c7b..db4161a4e23 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -27,10 +27,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
@@ -52,7 +50,6 @@ import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
-import org.apache.doris.thrift.TFileType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -343,7 +340,7 @@ public class HiveScanNode extends FileQueryScanNode {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
- new HiveSplitCreator(status.getAcidInfo())));
+ new HiveSplitCreator(status.getAcidInfo())));
}
}
@@ -409,21 +406,6 @@ public class HiveScanNode extends FileQueryScanNode {
return hmsTable;
}
- @Override
- protected TFileType getLocationType() throws UserException {
- return
getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
- }
-
- @Override
- protected TFileType getLocationType(String location) throws UserException {
- String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
- if (bindBrokerName != null) {
- return TFileType.FILE_BROKER;
- }
- return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for
hms table " + hmsTable.getName()));
- }
-
@Override
public TFileFormatType getFileFormatType() throws UserException {
TFileFormatType type = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
index 7c9345991fb..5dd63e734c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
@@ -17,18 +17,17 @@
package org.apache.doris.datasource.hive.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.spi.Split;
-import org.apache.hadoop.fs.Path;
-
import java.util.List;
public class HiveSplit extends FileSplit {
- public HiveSplit(Path path, long start, long length, long fileLength,
+ private HiveSplit(LocationPath path, long start, long length, long
fileLength,
long modificationTime, String[] hosts, List<String>
partitionValues, AcidInfo acidInfo) {
super(path, start, length, fileLength, modificationTime, hosts,
partitionValues);
this.acidInfo = acidInfo;
@@ -53,12 +52,9 @@ public class HiveSplit extends FileSplit {
this.acidInfo = acidInfo;
}
- public HiveSplitCreator() {
- this(null);
- }
-
@Override
- public Split create(Path path, long start, long length, long
fileLength, long modificationTime, String[] hosts,
+ public Split create(LocationPath path, long start, long length, long
fileLength,
+ long modificationTime, String[] hosts,
List<String> partitionValues) {
return new HiveSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues, acidInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
index fa24dc53e56..5e76996bb12 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.hudi.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.spi.Split;
@@ -210,14 +211,16 @@ public class COWIncrementalRelation implements
IncrementalRelation {
: Collections.emptyList();
for (String baseFile : filteredMetaBootstrapFullPaths) {
HoodieWriteStat stat = fileToWriteStat.get(baseFile);
- splits.add(new FileSplit(new Path(baseFile), 0,
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
- new String[0],
+ splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0,
+ stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
+ 0, new String[0],
HudiPartitionProcessor.parsePartitionValues(partitionNames,
stat.getPartitionPath())));
}
for (String baseFile : filteredRegularFullPaths) {
HoodieWriteStat stat = fileToWriteStat.get(baseFile);
- splits.add(new FileSplit(new Path(baseFile), 0,
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
- new String[0],
+ splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0,
+ stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
+ 0, new String[0],
HudiPartitionProcessor.parsePartitionValues(partitionNames,
stat.getPartitionPath())));
}
return splits;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 66c14446845..abd5a377f5a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -240,7 +240,7 @@ public class HudiScanNode extends HiveScanNode {
}
}
- public void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
+ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value());
THudiFileDesc fileDesc = new THudiFileDesc();
@@ -351,8 +351,7 @@ public class HudiScanNode extends HiveScanNode {
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
- Path splitFilePath = locationPath.toStorageLocation();
- splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
+ splits.add(new FileSplit(locationPath, 0, fileSize, fileSize,
0,
new String[0], partition.getPartitionValues()));
});
} else {
@@ -362,7 +361,7 @@ public class HudiScanNode extends HiveScanNode {
}
}
- private void getPartitionSplits(List<HivePartition> partitions,
List<Split> splits) {
+ private void getPartitionsSplits(List<HivePartition> partitions,
List<Split> splits) {
Executor executor =
Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
@@ -397,7 +396,7 @@ public class HudiScanNode extends HiveScanNode {
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
- getPartitionSplits(prunedPartitions, splits);
+ getPartitionsSplits(prunedPartitions, splits);
return splits;
}
@@ -482,8 +481,8 @@ public class HudiScanNode extends HiveScanNode {
// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
- HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize,
fileSize,
- new String[0], partitionValues);
+ HudiSplit split = new HudiSplit(new LocationPath(agencyPath,
hmsTable.getCatalogProperties()),
+ 0, fileSize, fileSize, new String[0], partitionValues);
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
index 121dcf68005..c72f7621fea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
@@ -17,18 +17,18 @@
package org.apache.doris.datasource.hudi.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import lombok.Data;
-import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class HudiSplit extends FileSplit {
- public HudiSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ public HudiSplit(LocationPath file, long start, long length, long
fileLength, String[] hosts,
List<String> partitionValues) {
- super(file, start, length, fileLength, hosts, partitionValues);
+ super(file, start, length, fileLength, 0, hosts, partitionValues);
}
private String instantTime;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 56222d84955..2ca51298fe6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
@@ -42,7 +41,6 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TPlanNode;
@@ -51,7 +49,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
@@ -133,7 +130,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
}
- public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
+ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@@ -147,8 +144,7 @@ public class IcebergScanNode extends FileQueryScanNode {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
LocationPath locationPath = new LocationPath(deleteFilePath,
icebergSplit.getConfig());
- Path splitDeletePath = locationPath.toStorageLocation();
- deleteFileDesc.setPath(splitDeletePath.toString());
+
deleteFileDesc.setPath(locationPath.toStorageLocation().toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
IcebergDeleteFileFilter.PositionDelete positionDelete =
(IcebergDeleteFileFilter.PositionDelete) filter;
@@ -211,8 +207,6 @@ public class IcebergScanNode extends FileQueryScanNode {
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
- String dataFilePath =
normalizeLocation(splitTask.file().path().toString());
-
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
@@ -238,10 +232,10 @@ public class IcebergScanNode extends FileQueryScanNode {
// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
- LocationPath locationPath = new LocationPath(dataFilePath,
source.getCatalog().getProperties());
- Path finalDataFilePath = locationPath.toStorageLocation();
+ String originalPath = splitTask.file().path().toString();
+ LocationPath locationPath = new LocationPath(originalPath,
source.getCatalog().getProperties());
IcebergSplit split = new IcebergSplit(
- finalDataFilePath,
+ locationPath,
splitTask.start(),
splitTask.length(),
splitTask.file().fileSizeInBytes(),
@@ -249,7 +243,7 @@ public class IcebergScanNode extends FileQueryScanNode {
formatVersion,
source.getCatalog().getProperties(),
partitionValues,
- splitTask.file().path().toString());
+ originalPath);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
@@ -311,36 +305,6 @@ public class IcebergScanNode extends FileQueryScanNode {
return filters;
}
- @Override
- public TFileType getLocationType() throws UserException {
- String location = icebergTable.location();
- return getLocationType(location);
- }
-
- @Override
- public TFileType getLocationType(String location) throws UserException {
- final String fLocation = normalizeLocation(location);
- return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
- new DdlException("Unknown file location " + fLocation + " for
iceberg table " + icebergTable.name()));
- }
-
- private String normalizeLocation(String location) {
- Map<String, String> props = source.getCatalog().getProperties();
- LocationPath locationPath = new LocationPath(location, props);
- String icebergCatalogType =
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
- if ("hadoop".equalsIgnoreCase(icebergCatalogType)) {
- // if no scheme info, fill will HADOOP_FS_NAME
- // if no HADOOP_FS_NAME, then should be local file system
- if (locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME) {
- String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
- if (fsName != null) {
- location = fsName + location;
- }
- }
- }
- return location;
- }
-
@Override
public TFileFormatType getFileFormatType() throws UserException {
TFileFormatType type;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index d867245dbe3..8549e96bc2e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -17,10 +17,10 @@
package org.apache.doris.datasource.iceberg.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import lombok.Data;
-import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.Map;
@@ -28,21 +28,23 @@ import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
+ // Doris will convert the schema in FileSystem to achieve the function of
natively reading files.
+ // For example, s3a:// will be converted to s3://.
+ // The position delete file of iceberg will record the full path of the
datafile, which includes the schema.
+ // When comparing datafile with position delete, the converted path cannot
be used,
+ // but the original datafile path must be used.
private final String originalPath;
+ private Integer formatVersion;
+ private List<IcebergDeleteFileFilter> deleteFileFilters;
+ private Map<String, String> config;
// File path will be changed if the file is modified, so there's no need
to get modification time.
- public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ public IcebergSplit(LocationPath file, long start, long length, long
fileLength, String[] hosts,
Integer formatVersion, Map<String, String> config,
List<String> partitionList, String originalPath) {
- super(file, start, length, fileLength, hosts, partitionList);
+ super(file, start, length, fileLength, 0, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
}
-
- private Integer formatVersion;
- private List<IcebergDeleteFileFilter> deleteFileFilters;
- private Map<String, String> config;
}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 6521ecd3101..ea651df9fef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -23,11 +23,10 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
-import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.TablePartitionValues;
-import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
@@ -35,13 +34,12 @@ import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMaxComputeFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.aliyun.odps.Table;
import com.aliyun.odps.tunnel.TunnelException;
-import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
@@ -53,8 +51,8 @@ import java.util.Map;
public class MaxComputeScanNode extends FileQueryScanNode {
private final MaxComputeExternalTable table;
- private final MaxComputeExternalCatalog catalog;
- public static final int MIN_SPLIT_SIZE = 4096;
+ private static final int MIN_SPLIT_SIZE = 4096;
+ private static final LocationPath VIRTUAL_SLICE_PART = new
LocationPath("/virtual_slice_part", Maps.newHashMap());
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
needCheckColumnPriv);
@@ -64,7 +62,6 @@ public class MaxComputeScanNode extends FileQueryScanNode {
StatisticalType statisticalType, boolean
needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
table = (MaxComputeExternalTable) desc.getTable();
- catalog = (MaxComputeExternalCatalog) table.getCatalog();
}
@Override
@@ -74,7 +71,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
}
- public void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit
maxComputeSplit) {
+ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit
maxComputeSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
@@ -85,16 +82,6 @@ public class MaxComputeScanNode extends FileQueryScanNode {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
- @Override
- protected TFileType getLocationType() throws UserException {
- return getLocationType(null);
- }
-
- @Override
- protected TFileType getLocationType(String location) throws UserException {
- return TFileType.FILE_NET;
- }
-
@Override
public TFileFormatType getFileFormatType() {
return TFileFormatType.FORMAT_JNI;
@@ -144,10 +131,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static void addPartitionSplits(List<Split> result, Table
odpsTable, String partitionSpec) {
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
// use '-1' to read whole partition, avoid expending too much time on
calling table.getTotalRows()
- Pair<Long, Long> range = Pair.of(0L, -1L);
- FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"),
- range.first, range.second, -1, modificationTime, null,
Collections.emptyList());
- result.add(new MaxComputeSplit(partitionSpec, rangeSplit));
+ result.add(new MaxComputeSplit(VIRTUAL_SLICE_PART,
+ 0, -1L, -1, modificationTime, null, Collections.emptyList(),
null));
}
private static void addBatchSplits(List<Split> result, Table odpsTable,
long totalRows) {
@@ -171,9 +156,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
if (!sliceRange.isEmpty()) {
for (int i = 0; i < sliceRange.size(); i++) {
Pair<Long, Long> range = sliceRange.get(i);
- FileSplit rangeSplit = new FileSplit(new
Path("/virtual_slice_" + i),
- range.first, range.second, totalRows,
modificationTime, null, Collections.emptyList());
- result.add(new MaxComputeSplit(rangeSplit));
+ result.add(new MaxComputeSplit(new
LocationPath("/virtual_slice_" + i, Maps.newHashMap()),
+ range.first, range.second, totalRows,
modificationTime, null, Collections.emptyList(), null));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java
index 20b285c4cfc..256ee1adefb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeSplit.java
@@ -17,23 +17,22 @@
package org.apache.doris.datasource.maxcompute.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.thrift.TFileType;
+import java.util.List;
import java.util.Optional;
public class MaxComputeSplit extends FileSplit {
private final Optional<String> partitionSpec;
- public MaxComputeSplit(FileSplit rangeSplit) {
- super(rangeSplit.path, rangeSplit.start, rangeSplit.length,
rangeSplit.fileLength,
- rangeSplit.hosts, rangeSplit.partitionValues);
- this.partitionSpec = Optional.empty();
- }
-
- public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) {
- super(rangeSplit.path, rangeSplit.start, rangeSplit.length,
rangeSplit.fileLength,
- rangeSplit.hosts, rangeSplit.partitionValues);
- this.partitionSpec = Optional.of(partitionSpec);
+ public MaxComputeSplit(LocationPath path, long start, long length, long
fileLength,
+ long modificationTime, String[] hosts, List<String>
partitionValues, String partitionSpec) {
+ super(path, start, length, fileLength, modificationTime, hosts,
partitionValues);
+ this.partitionSpec = Optional.ofNullable(partitionSpec);
+ // MC always use FILE_NET type
+ this.locationType = TFileType.FILE_NET;
}
public Optional<String> getPartitionSpec() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 8ed8af15d86..27b40b5bcc9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -35,19 +35,16 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
@@ -147,7 +144,7 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
- public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit
paimonSplit) {
+ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit
paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
@@ -214,10 +211,9 @@ public class PaimonScanNode extends FileQueryScanNode {
DeletionFile deletionFile = deletionFiles.get(i);
LocationPath locationPath = new
LocationPath(file.path(),
source.getCatalog().getProperties());
- Path finalDataFilePath =
locationPath.toStorageLocation();
try {
List<Split> dorisSplits = splitFile(
- finalDataFilePath,
+ locationPath,
0,
null,
file.length(),
@@ -242,11 +238,10 @@ public class PaimonScanNode extends FileQueryScanNode {
for (RawFile file : rawFiles) {
LocationPath locationPath = new
LocationPath(file.path(),
source.getCatalog().getProperties());
- Path finalDataFilePath =
locationPath.toStorageLocation();
try {
splits.addAll(
splitFile(
- finalDataFilePath,
+ locationPath,
0,
null,
file.length(),
@@ -286,17 +281,6 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
- @Override
- public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- return getLocationType(((FileStoreTable)
source.getPaimonTable()).location().toString());
- }
-
- @Override
- public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
- return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for
paimon table "));
- }
-
@Override
public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
return TFileFormatType.FORMAT_JNI;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 9ac44537e8a..44063e3226b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -33,7 +33,6 @@ import java.util.Map;
public class PaimonSource {
private final PaimonExternalTable paimonExtTable;
private final Table originTable;
-
private final TupleDescriptor desc;
public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 6cca70577f8..ffd063d77e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -17,11 +17,12 @@
package org.apache.doris.datasource.paimon.source;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.TableFormatType;
-import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;
@@ -29,20 +30,21 @@ import java.util.List;
import java.util.Optional;
public class PaimonSplit extends FileSplit {
+ private static final LocationPath DUMMY_PATH = new
LocationPath("/dummyPath", Maps.newHashMap());
private Split split;
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;
public PaimonSplit(Split split) {
- super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
+ super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
}
- public PaimonSplit(Path file, long start, long length, long fileLength,
String[] hosts,
- List<String> partitionList) {
- super(file, start, length, fileLength, hosts, partitionList);
+ private PaimonSplit(LocationPath file, long start, long length, long
fileLength, long modificationTime,
+ String[] hosts, List<String> partitionList) {
+ super(file, start, length, fileLength, modificationTime, hosts,
partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
}
@@ -51,10 +53,6 @@ public class PaimonSplit extends FileSplit {
return split;
}
- public void setSplit(Split split) {
- this.split = split;
- }
-
public TableFormatType getTableFormatType() {
return tableFormatType;
}
@@ -76,14 +74,14 @@ public class PaimonSplit extends FileSplit {
static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@Override
- public org.apache.doris.spi.Split create(Path path,
+ public org.apache.doris.spi.Split create(LocationPath path,
long start,
long length,
long fileLength,
long modificationTime,
String[] hosts,
List<String> partitionValues) {
- return new PaimonSplit(path, start, length, fileLength, hosts,
partitionValues);
+ return new PaimonSplit(path, start, length, fileLength,
modificationTime, hosts, partitionValues);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 26b90c26a46..b0f0406c215 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
@@ -41,7 +42,7 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -101,21 +102,16 @@ public class TVFScanNode extends FileQueryScanNode {
@Override
protected TFileCompressType getFileCompressType(FileSplit fileSplit)
throws UserException {
TFileCompressType fileCompressType =
tableValuedFunction.getTFileCompressType();
- return Util.getOrInferCompressType(fileCompressType,
fileSplit.getPath().toString());
+ return Util.getOrInferCompressType(fileCompressType,
fileSplit.getPathString());
}
@Override
- public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- return getLocationType(null);
+ protected boolean isFileStreamType() {
+ return tableValuedFunction.getTFileType() == TFileType.FILE_STREAM;
}
@Override
- public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
- return tableValuedFunction.getTFileType();
- }
-
- @Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
+ public Map<String, String> getLocationProperties() {
return tableValuedFunction.getLocationProperties();
}
@@ -137,13 +133,14 @@ public class TVFScanNode extends FileQueryScanNode {
}
List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
for (TBrokerFileStatus fileStatus : fileStatuses) {
- Path path = new Path(fileStatus.getPath());
+ Map<String, String> prop = Maps.newHashMap();
try {
- splits.addAll(splitFile(path, fileStatus.getBlockSize(), null,
fileStatus.getSize(),
+ splits.addAll(splitFile(new LocationPath(fileStatus.getPath(),
prop), fileStatus.getBlockSize(),
+ null, fileStatus.getSize(),
fileStatus.getModificationTime(),
fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
} catch (IOException e) {
- LOG.warn("get file split failed for TVF: {}", path, e);
+ LOG.warn("get file split failed for TVF: {}",
fileStatus.getPath(), e);
throw new UserException(e);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index efa9bd9b8f8..bbd278afef4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -118,7 +118,7 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
THiveLocationParams locationParams = new THiveLocationParams();
LocationPath locationPath = new LocationPath(sd.getLocation(),
targetTable.getHadoopProperties());
- String location = locationPath.toString();
+ String location = locationPath.getPath().toString();
String storageLocation = locationPath.toStorageLocation().toString();
TFileType fileType = locationPath.getTFileTypeForBE();
if (fileType == TFileType.FILE_S3) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 0e01b599964..bfacb572305 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -133,7 +133,7 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
// location
LocationPath locationPath = new
LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
- tSink.setOriginalOutputPath(locationPath.toString());
+ tSink.setOriginalOutputPath(locationPath.getPath().toString());
tSink.setFileType(locationPath.getTFileTypeForBE());
if (insertCtx.isPresent()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 5113fa6dd5c..e4c52971813 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -71,7 +71,6 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTextSerdeType;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
@@ -97,23 +96,6 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
public static final String PROP_TABLE_ID = "table_id";
- protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new
ImmutableSet.Builder<String>()
- .add(FileFormatConstants.PROP_FORMAT)
- .add(FileFormatConstants.PROP_JSON_ROOT)
- .add(FileFormatConstants.PROP_JSON_PATHS)
- .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY)
- .add(FileFormatConstants.PROP_READ_JSON_BY_LINE)
- .add(FileFormatConstants.PROP_NUM_AS_STRING)
- .add(FileFormatConstants.PROP_FUZZY_PARSE)
- .add(FileFormatConstants.PROP_COLUMN_SEPARATOR)
- .add(FileFormatConstants.PROP_LINE_DELIMITER)
- .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES)
- .add(FileFormatConstants.PROP_SKIP_LINES)
- .add(FileFormatConstants.PROP_CSV_SCHEMA)
- .add(FileFormatConstants.PROP_COMPRESS_TYPE)
- .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS)
- .build();
-
// Columns got from file and path(if has)
protected List<Column> columns = null;
// User specified csv columns, it will override columns got from file
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index 69130f57fff..23f052d6131 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -17,6 +17,8 @@
package org.apache.doris.common.util;
+import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.common.util.LocationPath.Scheme;
import org.apache.doris.fs.FileSystemType;
import org.junit.jupiter.api.Assertions;
@@ -63,8 +65,20 @@ public class LocationPathTest {
Assertions.assertTrue(locationPath.get().startsWith("/dir")
&& !locationPath.get().startsWith("hdfs://"));
Assertions.assertTrue(beLocation.startsWith("/dir") &&
!beLocation.startsWith("hdfs://"));
- }
+ props.clear();
+ props.put(HdfsResource.HADOOP_FS_NAME, "hdfs://test.com");
+ locationPath = new LocationPath("/dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
+ Assertions.assertEquals("hdfs://test.com/dir/file.path",
locationPath.get());
+ Assertions.assertEquals("hdfs://test.com/dir/file.path",
locationPath.toStorageLocation().toString());
+ props.clear();
+ props.put(HdfsResource.HADOOP_FS_NAME, "oss://test.com");
+ locationPath = new LocationPath("/dir/file.path", props);
+ Assertions.assertTrue(locationPath.get().startsWith("oss://"));
+ Assertions.assertEquals("oss://test.com/dir/file.path",
locationPath.get());
+ Assertions.assertEquals("s3://test.com/dir/file.path",
locationPath.toStorageLocation().toString());
+ }
@Test
public void testJFSLocationConvert() {
@@ -171,7 +185,7 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("unknown://test.com",
rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
- Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.UNKNOWN);
+ Assertions.assertTrue(locationPath.getScheme() == Scheme.UNKNOWN);
// BE
String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("unknown://"));
@@ -184,7 +198,7 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("/path/to/local",
rangeProps);
// FE
Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local"));
- Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME);
+ Assertions.assertTrue(locationPath.getScheme() == Scheme.NOSCHEME);
// BE
String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local"));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 82f46862674..57e64f0f223 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.NodeSelectionStrategy;
@@ -76,28 +77,28 @@ public class FederationBackendPolicyTest {
};
List<Split> splits = new ArrayList<>();
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 112140970, 112140970, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 120839661, 120839661, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 108897409, 108897409, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 95795997, 95795997, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 105664025, 105664025, 0, null, Collections.emptyList()));
@@ -141,28 +142,28 @@ public class FederationBackendPolicyTest {
};
List<Split> splits = new ArrayList<>();
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 112140970, 112140970, 0, new String[] {"172.30.0.100"},
Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 120839661, 120839661, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 108897409, 108897409, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 95795997, 95795997, 0, new String[] {"172.30.0.106"},
Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 105664025, 105664025, 0, null, Collections.emptyList()));
@@ -178,11 +179,11 @@ public class FederationBackendPolicyTest {
for (Split split : assignedSplits) {
FileSplit fileSplit = (FileSplit) split;
++totalSplitNum;
- if (fileSplit.getPath().equals(new Path(
+ if (fileSplit.getPath().getPath().equals(new Path(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
{
Assert.assertEquals("172.30.0.100", backend.getHost());
checkedLocalSplit.add(true);
- } else if (fileSplit.getPath().equals(new Path(
+ } else if (fileSplit.getPath().getPath().equals(new Path(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
{
Assert.assertEquals("172.30.0.106", backend.getHost());
checkedLocalSplit.add(true);
@@ -235,28 +236,28 @@ public class FederationBackendPolicyTest {
};
List<Split> splits = new ArrayList<>();
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 112140970, 112140970, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 120839661, 120839661, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 108897409, 108897409, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 95795997, 95795997, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 105664025, 105664025, 0, null, Collections.emptyList()));
@@ -344,7 +345,7 @@ public class FederationBackendPolicyTest {
int splitCount = random.nextInt(1000 - 100) + 100;
for (int i = 0; i < splitCount; ++i) {
long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
- FileSplit split = new FileSplit(new Path(
+ FileSplit split = new FileSplit(new LocationPath(
"hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
0, splitLength, splitLength, 0, null,
Collections.emptyList());
remoteSplits.add(split);
@@ -364,7 +365,7 @@ public class FederationBackendPolicyTest {
totalLocalHosts.add(localHost);
}
long localSplitLength = random.nextInt(115343360 - 94371840) +
94371840;
- FileSplit split = new FileSplit(new Path(
+ FileSplit split = new FileSplit(new LocationPath(
"hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
0, localSplitLength, localSplitLength, 0,
localHosts.toArray(new String[0]),
Collections.emptyList());
@@ -467,7 +468,7 @@ public class FederationBackendPolicyTest {
int splitCount = random.nextInt(1000 - 100) + 100;
for (int i = 0; i < splitCount; ++i) {
long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
- FileSplit split = new FileSplit(new Path(
+ FileSplit split = new FileSplit(new LocationPath(
"hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
0, splitLength, splitLength, 0, null,
Collections.emptyList());
remoteSplits.add(split);
@@ -487,7 +488,7 @@ public class FederationBackendPolicyTest {
totalLocalHosts.add(localHost);
}
long localSplitLength = random.nextInt(115343360 - 94371840) +
94371840;
- FileSplit split = new FileSplit(new Path(
+ FileSplit split = new FileSplit(new LocationPath(
"hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
0, localSplitLength, localSplitLength, 0,
localHosts.toArray(new String[0]),
Collections.emptyList());
@@ -604,28 +605,28 @@ public class FederationBackendPolicyTest {
};
List<Split> splits = new ArrayList<>();
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 112140970, 112140970, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 120839661, 120839661, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 108897409, 108897409, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 95795997, 95795997, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 104600402, 104600402, 0, null, Collections.emptyList()));
- splits.add(new FileSplit(new Path(
+ splits.add(new FileSplit(new LocationPath(
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
0, 105664025, 105664025, 0, null, Collections.emptyList()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]