This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 74ddc0d61d4 [improvement](fe and broker) support specify broker to
getSplits, check isSplitable, file scan for HMS Multi-catalog (#24830) (#27236)
74ddc0d61d4 is described below
commit 74ddc0d61d4e82c743e8ad6d45c9b4de9c61e806
Author: DuRipeng <[email protected]>
AuthorDate: Sun Nov 19 22:20:47 2023 +0800
[improvement](fe and broker) support specify broker to getSplits, check
isSplitable, file scan for HMS Multi-catalog (#24830) (#27236)
bp #24830
---
.../apache/doris/datasource/ExternalCatalog.java | 8 ++
.../doris/datasource/HMSExternalCatalog.java | 2 +
.../doris/datasource/hive/HiveMetaStoreCache.java | 54 ++++++----
.../apache/doris/external/hive/util/HiveUtil.java | 12 ++-
.../java/org/apache/doris/fs/FileSystemCache.java | 17 +++-
.../org/apache/doris/fs/FileSystemFactory.java | 11 +-
.../java/org/apache/doris/fs/FileSystemType.java | 1 +
.../apache/doris/fs/remote/BrokerFileSystem.java | 87 ++++++++++++++++
.../doris/planner/external/HiveScanNode.java | 25 +++--
.../doris/statistics/util/StatisticsUtil.java | 3 +-
fs_brokers/apache_hdfs_broker/pom.xml | 26 ++++-
.../doris/broker/hdfs/FileSystemManager.java | 55 ++++++++--
.../doris/broker/hdfs/HDFSBrokerServiceImpl.java | 44 ++++++++
.../java/org/apache/doris/common/HiveUtils.java | 112 +++++++++++++++++++++
gensrc/thrift/PaloBrokerService.thrift | 20 ++++
15 files changed, 431 insertions(+), 46 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 27bd524c1eb..1ed6bd027aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -592,6 +592,14 @@ public abstract class ExternalCatalog
return ret;
}
+ public String bindBrokerName() {
+ Map<String, String> properties = catalogProperty.getProperties();
+ if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+ return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
+ }
+ return null;
+ }
+
@Override
public Collection<DatabaseIf> getAllDbs() {
makeSureInitialized();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 47562ebb1f1..6e3543dfcce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
private long lastSyncedEventId = -1L;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
public static final String FILE_META_CACHE_TTL_SECOND =
"file.meta.cache.ttl-second";
+ // broker name for file split and query scan.
+ public static final String BIND_BROKER_NAME = "broker.name";
private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH =
"ipc.client.fallback-to-simple-auth-allowed";
// -1 means file cache no ttl set
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 850619cb246..bac891eb920 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -353,11 +353,13 @@ public class HiveMetaStoreCache {
// Get File Status by using FileSystem API.
private FileCacheValue getFileCache(String location, InputFormat<?, ?>
inputFormat,
JobConf jobConf,
- List<String> partitionValues) throws
UserException {
+ List<String> partitionValues,
+ String bindBrokerName) throws
UserException {
FileCacheValue result = new FileCacheValue();
- result.setSplittable(HiveUtil.isSplittable(inputFormat, new
Path(location), jobConf));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location),
jobConf));
+ new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
+ location, bindBrokerName), jobConf, bindBrokerName));
+ result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at
the same time. eg:
@@ -419,7 +421,8 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
if (key.useSelfSplitter) {
- result = getFileCache(finalLocation, inputFormat, jobConf,
key.getPartitionValues());
+ result = getFileCache(finalLocation, inputFormat, jobConf,
+ key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
String remoteUser =
jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -498,23 +501,23 @@ public class HiveMetaStoreCache {
}
public List<FileCacheValue>
getFilesByPartitionsWithCache(List<HivePartition> partitions,
- boolean useSelfSplitter) {
- return getFilesByPartitions(partitions, useSelfSplitter, true);
+ boolean useSelfSplitter, String bindBrokerName) {
+ return getFilesByPartitions(partitions, useSelfSplitter, true,
bindBrokerName);
}
public List<FileCacheValue>
getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
- boolean useSelfSplitter) {
- return getFilesByPartitions(partitions, useSelfSplitter, false);
+ boolean useSelfSplitter, String bindBrokerName) {
+ return getFilesByPartitions(partitions, useSelfSplitter, false,
bindBrokerName);
}
private List<FileCacheValue> getFilesByPartitions(List<HivePartition>
partitions,
- boolean useSelfSplitter, boolean withCache) {
+ boolean useSelfSplitter, boolean withCache, String bindBrokerName)
{
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(),
p.getTblName(), p.getPath(),
- p.getInputFormat(), useSelfSplitter)
- : new FileCacheKey(p.getPath(), p.getInputFormat(),
p.getPartitionValues());
+ p.getInputFormat(), useSelfSplitter, bindBrokerName)
+ : new FileCacheKey(p.getPath(), p.getInputFormat(),
p.getPartitionValues(), bindBrokerName);
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
return fileCacheKey;
}).collect(Collectors.toList());
@@ -592,7 +595,7 @@ public class HiveMetaStoreCache {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new
FileCacheKey(partition.getPath(),
- null, partition.getPartitionValues()));
+ null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
@@ -610,7 +613,7 @@ public class HiveMetaStoreCache {
* and FE will exit if some network problems occur.
* */
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
- dbName, tblName, null, null, false);
+ dbName, tblName, null, null, false, null);
fileCacheRef.get().invalidate(fileCacheKey);
}
}
@@ -625,7 +628,7 @@ public class HiveMetaStoreCache {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new
FileCacheKey(partition.getPath(),
- null, partition.getPartitionValues()));
+ null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
@@ -771,7 +774,7 @@ public class HiveMetaStoreCache {
}
public List<FileCacheValue> getFilesByTransaction(List<HivePartition>
partitions, ValidWriteIdList validWriteIds,
- boolean isFullAcid, long tableId) {
+ boolean isFullAcid, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
try {
@@ -802,7 +805,8 @@ public class HiveMetaStoreCache {
String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
-
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
+
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
+ bindBrokerName), jobConf,
bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -823,7 +827,9 @@ public class HiveMetaStoreCache {
for (AcidUtils.ParsedDelta delta :
directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location),
jobConf));
+ new FileSystemCache.FileSystemCacheKey(
+ FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames =
locatedFiles.files().stream().map(f -> f.getName()).filter(
@@ -841,7 +847,9 @@ public class HiveMetaStoreCache {
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
- new
FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location),
jobConf));
+ new FileSystemCache.FileSystemCacheKey(
+ FileSystemFactory.getFSIdentity(location,
bindBrokerName),
+ jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location,
true, false);
locatedFiles.files().stream().filter(
f ->
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
@@ -939,6 +947,8 @@ public class HiveMetaStoreCache {
private String location;
// not in key
private String inputFormat;
+ // Broker name for file split and file scan.
+ private String bindBrokerName;
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
@@ -947,16 +957,18 @@ public class HiveMetaStoreCache {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
- public FileCacheKey(String location, String inputFormat, List<String>
partitionValues) {
+ public FileCacheKey(String location, String inputFormat, List<String>
partitionValues, String bindBrokerName) {
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ?
Lists.newArrayList() : partitionValues;
this.useSelfSplitter = true;
+ this.bindBrokerName = bindBrokerName;
}
public static FileCacheKey createDummyCacheKey(String dbName, String
tblName, String location,
- String inputFormat,
boolean useSelfSplitter) {
- FileCacheKey fileCacheKey = new FileCacheKey(location,
inputFormat, null);
+ String inputFormat,
boolean useSelfSplitter,
+ String bindBrokerName) {
+ FileCacheKey fileCacheKey = new FileCacheKey(location,
inputFormat, null, bindBrokerName);
fileCacheKey.dummyKey = dbName + "." + tblName;
fileCacheKey.useSelfSplitter = useSelfSplitter;
return fileCacheKey;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index 4bf01910f82..deb048b5943 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -24,6 +24,8 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.BrokerFileSystem;
+import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -188,12 +190,17 @@ public final class HiveUtil {
}
}
- public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path
path, JobConf jobConf) {
+ public static boolean isSplittable(RemoteFileSystem remoteFileSystem,
InputFormat<?, ?> inputFormat,
+ String location, JobConf jobConf)
throws UserException {
+ if (remoteFileSystem instanceof BrokerFileSystem) {
+ return ((BrokerFileSystem) remoteFileSystem)
+ .isSplittable(location,
inputFormat.getClass().getCanonicalName());
+ }
+
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}
-
// use reflection to get isSplitable method on FileInputFormat
// ATTN: the method name is actually "isSplitable", but the right
spell is "isSplittable"
Method method = null;
@@ -209,6 +216,7 @@ public final class HiveUtil {
if (method == null) {
return false;
}
+ Path path = new Path(location);
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat,
FileSystemFactory.getNativeByPath(path, jobConf), path);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index edc746ebe24..7946dd5e8a7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -53,7 +53,7 @@ public class FileSystemCache {
}
private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
- return FileSystemFactory.getByType(key.type, key.conf);
+ return FileSystemFactory.getRemoteFileSystem(key.type, key.conf,
key.bindBrokerName);
}
public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -69,11 +69,13 @@ public class FileSystemCache {
// eg: hdfs://nameservices1
private final String fsIdent;
private final JobConf conf;
+ private final String bindBrokerName;
- public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf
conf) {
+ public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf
conf, String bindBrokerName) {
this.type = fs.first;
this.fsIdent = fs.second;
this.conf = conf;
+ this.bindBrokerName = bindBrokerName;
}
@Override
@@ -84,14 +86,21 @@ public class FileSystemCache {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
- return type.equals(((FileSystemCacheKey) obj).type)
+ boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey)
obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& conf == ((FileSystemCacheKey) obj).conf;
+ if (bindBrokerName == null) {
+ return equalsWithoutBroker;
+ }
+ return equalsWithoutBroker &&
bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
}
@Override
public int hashCode() {
- return Objects.hash(conf, fsIdent, type);
+ if (bindBrokerName == null) {
+ return Objects.hash(conf, fsIdent, type);
+ }
+ return Objects.hash(conf, fsIdent, type, bindBrokerName);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index 3837a7eb95b..e54a73bbff3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -56,9 +56,11 @@ public class FileSystemFactory {
}
}
- public static Pair<FileSystemType, String> getFSIdentity(String location) {
+ public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
FileSystemType fsType;
- if (S3Util.isObjStorage(location)) {
+ if (bindBrokerName != null) {
+ fsType = FileSystemType.BROKER;
+ } else if (S3Util.isObjStorage(location)) {
if (S3Util.isHdfsOnOssEndpoint(location)) {
// if hdfs service is enabled on oss, use hdfs lib to access
oss.
fsType = FileSystemType.DFS;
@@ -83,7 +85,8 @@ public class FileSystemFactory {
return Pair.of(fsType, fsIdent);
}
- public static RemoteFileSystem getByType(FileSystemType type,
Configuration conf) {
+ public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Configuration conf,
+ String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(),
e.getValue()));
switch (type) {
@@ -95,6 +98,8 @@ public class FileSystemFactory {
return new OFSFileSystem(properties);
case JFS:
return new JFSFileSystem(properties);
+ case BROKER:
+ return new BrokerFileSystem(bindBrokerName, properties);
default:
throw new IllegalStateException("Not supported file system
type: " + type);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
index 5ddea011744..018130f0c14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java
@@ -22,5 +22,6 @@ public enum FileSystemType {
DFS,
OFS,
JFS,
+ BROKER,
FILE
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
index cb871509288..ef8d484bda9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
@@ -24,8 +24,10 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.operations.BrokerFileOperations;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.service.FrontendOptions;
@@ -34,6 +36,8 @@ import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerIsSplittableRequest;
+import org.apache.doris.thrift.TBrokerIsSplittableResponse;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
@@ -65,6 +69,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -556,6 +561,88 @@ public class BrokerFileSystem extends RemoteFileSystem {
return Status.OK;
}
+ @Override
+ public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles,
boolean recursive) throws UserException {
+ // get a proper broker
+ Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
+ if (pair == null) {
+ throw new UserException("failed to get broker client");
+ }
+ TPaloBrokerService.Client client = pair.first;
+ TNetworkAddress address = pair.second;
+
+ // invoke broker 'listLocatedFiles' interface
+ boolean needReturn = true;
+ try {
+ TBrokerListPathRequest req = new
TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
+ recursive, properties);
+ req.setOnlyFiles(onlyFiles);
+ TBrokerListResponse response = client.listLocatedFiles(req);
+ TBrokerOperationStatus operationStatus = response.getOpStatus();
+ if (operationStatus.getStatusCode() !=
TBrokerOperationStatusCode.OK) {
+ throw new UserException("failed to listLocatedFiles, remote
path: " + remotePath + ". msg: "
+ + operationStatus.getMessage() + ", broker: " +
BrokerUtil.printBroker(name, address));
+ }
+ List<RemoteFile> result = new ArrayList<>();
+ List<TBrokerFileStatus> fileStatus = response.getFiles();
+ for (TBrokerFileStatus tFile : fileStatus) {
+ org.apache.hadoop.fs.Path path = new
org.apache.hadoop.fs.Path(tFile.path);
+ RemoteFile file = new RemoteFile(path.getName(), path,
!tFile.isDir, tFile.isDir, tFile.size,
+ tFile.getBlockSize(), tFile.getModificationTime(),
null /* blockLocations is null*/);
+ result.add(file);
+ }
+ LOG.info("finished to listLocatedFiles, remote path {}. get files:
{}", remotePath, result);
+ return new RemoteFiles(result);
+ } catch (TException e) {
+ needReturn = false;
+ throw new UserException("failed to listLocatedFiles, remote path: "
+ + remotePath + ". msg: " + e.getMessage() + ", broker: " +
BrokerUtil.printBroker(name, address));
+ } finally {
+ if (needReturn) {
+ ClientPool.brokerPool.returnObject(address, client);
+ } else {
+ ClientPool.brokerPool.invalidateObject(address, client);
+ }
+ }
+ }
+
+ public boolean isSplittable(String remotePath, String inputFormat) throws
UserException {
+ // get a proper broker
+ Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
+ if (pair == null) {
+ throw new UserException("failed to get broker client");
+ }
+ TPaloBrokerService.Client client = pair.first;
+ TNetworkAddress address = pair.second;
+
+ // invoke 'isSplittable' interface
+ boolean needReturn = true;
+ try {
+ TBrokerIsSplittableRequest req = new
TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE)
+
.setPath(remotePath).setInputFormat(inputFormat).setProperties(properties);
+ TBrokerIsSplittableResponse response = client.isSplittable(req);
+ TBrokerOperationStatus operationStatus = response.getOpStatus();
+ if (operationStatus.getStatusCode() !=
TBrokerOperationStatusCode.OK) {
+ throw new UserException("failed to get path isSplittable,
remote path: " + remotePath + ". msg: "
+ + operationStatus.getMessage() + ", broker: " +
BrokerUtil.printBroker(name, address));
+ }
+ boolean result = response.isSplittable();
+ LOG.info("finished to get path isSplittable, remote path {} with
format {}, isSplittable: {}",
+ remotePath, inputFormat, result);
+ return result;
+ } catch (TException e) {
+ needReturn = false;
+ throw new UserException("failed to get path isSplittable, remote
path: "
+ + remotePath + ". msg: " + e.getMessage() + ", broker: " +
BrokerUtil.printBroker(name, address));
+ } finally {
+ if (needReturn) {
+ ClientPool.brokerPool.returnObject(address, client);
+ } else {
+ ClientPool.brokerPool.invalidateObject(address, client);
+ }
+ }
+ }
+
// List files in remotePath
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 1ba77fa5f9c..d4141810487 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -196,8 +196,14 @@ public class HiveScanNode extends FileQueryScanNode {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
+ String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+ if (bindBrokerName != null && useSelfSplitter == false) {
+ // useSelfSplitter must be true if bindBrokerName is set.
+ throw new
UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if "
+ + HMSExternalCatalog.BIND_BROKER_NAME + " is set");
+ }
List<Split> allFiles = Lists.newArrayList();
- getFileSplitByPartitions(cache, getPartitions(), allFiles,
useSelfSplitter);
+ getFileSplitByPartitions(cache, getPartitions(), allFiles,
useSelfSplitter, bindBrokerName);
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
return allFiles;
@@ -210,12 +216,13 @@ public class HiveScanNode extends FileQueryScanNode {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
- List<Split> allFiles, boolean
useSelfSplitter) throws IOException {
+ List<Split> allFiles, boolean
useSelfSplitter,
+ String bindBrokerName) throws
IOException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
- fileCaches = getFileSplitByTransaction(cache, partitions);
+ fileCaches = getFileSplitByTransaction(cache, partitions,
bindBrokerName);
} else {
- fileCaches = cache.getFilesByPartitionsWithCache(partitions,
useSelfSplitter);
+ fileCaches = cache.getFilesByPartitionsWithCache(partitions,
useSelfSplitter, bindBrokerName);
}
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
@@ -287,7 +294,8 @@ public class HiveScanNode extends FileQueryScanNode {
return fileList.subList(0, index);
}
- private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache
cache, List<HivePartition> partitions) {
+ private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache
cache, List<HivePartition> partitions,
+ String
bindBrokerName) {
for (HivePartition partition : partitions) {
if (partition.getPartitionValues() == null ||
partition.getPartitionValues().isEmpty()) {
// this is unpartitioned table.
@@ -297,7 +305,8 @@ public class HiveScanNode extends FileQueryScanNode {
}
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
- return cache.getFilesByTransaction(partitions, validWriteIds,
hiveTransaction.isFullAcid(), hmsTable.getId());
+ return cache.getFilesByTransaction(partitions, validWriteIds,
+ hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName);
}
@Override
@@ -319,6 +328,10 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
protected TFileType getLocationType(String location) throws UserException {
+ String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
+ if (bindBrokerName != null) {
+ return TFileType.FILE_BROKER;
+ }
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms
table " + hmsTable.getName()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 927e556c91a..f2c77026312 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -702,7 +702,8 @@ public class StatisticsUtil {
table.getRemoteTable().getSd().getLocation(), null));
}
// Get files for all partitions.
- return cache.getFilesByPartitionsWithoutCache(hivePartitions, true);
+ String bindBrokerName = table.getCatalog().bindBrokerName();
+ return cache.getFilesByPartitionsWithoutCache(hivePartitions, true,
bindBrokerName);
}
/**
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml
b/fs_brokers/apache_hdfs_broker/pom.xml
index bbd58e5d5d4..2cb8d892dee 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -69,9 +69,10 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
<log4j2.version>2.18.0</log4j2.version>
<project.scm.id>github</project.scm.id>
- <hadoop.version>2.10.2</hadoop.version>
+ <hadoop.version>3.3.6</hadoop.version>
<netty.version>4.1.65.Final</netty.version>
<gcs.version>hadoop2-2.2.15</gcs.version>
+
<doris.hive.catalog.shade.version>1.0.1</doris.hive.catalog.shade.version>
</properties>
<profiles>
<!-- for custom internal repository -->
@@ -224,6 +225,29 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <!--
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!--
https://mvnrepository.com/artifact/org.apache.doris/hive-catalog-shade -->
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>hive-catalog-shade</artifactId>
+ <version>${doris.hive.catalog.shade.version}</version>
+ </dependency>
+ <!--
https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 55ba457404a..22be82e34c2 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -25,17 +25,19 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import java.io.File;
import java.io.FileNotFoundException;
@@ -1061,6 +1063,49 @@ public class FileSystemManager {
}
}
+ public List<TBrokerFileStatus> listLocatedFiles(String path, boolean
onlyFiles,
+ boolean recursive,
Map<String, String> properties) {
+ List<TBrokerFileStatus> resultFileStatus = null;
+ BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ Path locatedPath = new Path(path);
+ try {
+ FileSystem innerFileSystem = fileSystem.getDFSFileSystem();
+ RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ?
innerFileSystem.listFiles(locatedPath, recursive)
+ : innerFileSystem.listLocatedStatus(locatedPath);
+ return getFileLocations(locatedFiles);
+ } catch (FileNotFoundException e) {
+ logger.info("file not found: " + e.getMessage());
+ throw new
BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+ e, "file not found");
+ } catch (Exception e) {
+ logger.error("errors while get file status ", e);
+ fileSystem.closeFileSystem();
+ throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+ e, "unknown error when listLocatedFiles");
+ }
+ }
+
+ private List<TBrokerFileStatus>
getFileLocations(RemoteIterator<LocatedFileStatus> locatedFiles) throws
IOException {
+ List<TBrokerFileStatus> locations = new ArrayList<>();
+ while (locatedFiles.hasNext()) {
+ LocatedFileStatus fileStatus = locatedFiles.next();
+ TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus();
+ brokerFileStatus.setPath(fileStatus.getPath().toString());
+ brokerFileStatus.setIsDir(fileStatus.isDirectory());
+ if (fileStatus.isDirectory()) {
+ brokerFileStatus.setIsSplitable(false);
+ brokerFileStatus.setSize(-1);
+ } else {
+ brokerFileStatus.setSize(fileStatus.getLen());
+ brokerFileStatus.setIsSplitable(true);
+ }
+
brokerFileStatus.setModificationTime(fileStatus.getModificationTime());
+ brokerFileStatus.setBlockSize(fileStatus.getBlockSize());
+ locations.add(brokerFileStatus);
+ }
+ return locations;
+ }
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly,
Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
@@ -1282,13 +1327,7 @@ public class FileSystemManager {
FSDataOutputStream fsDataOutputStream =
clientContextManager.getFsDataOutputStream(fd);
synchronized (fsDataOutputStream) {
long currentStreamOffset;
- try {
- currentStreamOffset = fsDataOutputStream.getPos();
- } catch (IOException e) {
- logger.error("errors while get file pos from output stream",
e);
- throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
- "errors while get file pos from output stream");
- }
+ currentStreamOffset = fsDataOutputStream.getPos();
if (currentStreamOffset != offset) {
// it's ok, it means that last pwrite succeed finally
if (currentStreamOffset == offset + data.length) {
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
index 14ff74dd41e..816462ecb34 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.doris.broker.hdfs;
import com.google.common.base.Stopwatch;
import org.apache.doris.common.BrokerPerfMonitor;
+import org.apache.doris.common.HiveUtils;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
@@ -28,6 +29,8 @@ import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileSizeRequest;
import org.apache.doris.thrift.TBrokerFileSizeResponse;
import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerIsSplittableResponse;
+import org.apache.doris.thrift.TBrokerIsSplittableRequest;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOpenReaderRequest;
@@ -86,6 +89,47 @@ public class HDFSBrokerServiceImpl implements
TPaloBrokerService.Iface {
}
}
+ @Override
+ public TBrokerListResponse listLocatedFiles(TBrokerListPathRequest request)
+ throws TException {
+ logger.info("received a listLocatedFiles request, request detail: " +
request);
+ TBrokerListResponse response = new TBrokerListResponse();
+ try {
+ boolean recursive = request.isIsRecursive();
+ boolean onlyFiles = false;
+ if (request.isSetOnlyFiles()) {
+ onlyFiles = request.isOnlyFiles();
+ }
+ List<TBrokerFileStatus> fileStatuses =
fileSystemManager.listLocatedFiles(request.path,
+ onlyFiles, recursive, request.properties);
+ response.setOpStatus(generateOKStatus());
+ response.setFiles(fileStatuses);
+ return response;
+ } catch (BrokerException e) {
+ logger.warn("failed to list path: " + request.path, e);
+ TBrokerOperationStatus errorStatus =
e.generateFailedOperationStatus();
+ response.setOpStatus(errorStatus);
+ return response;
+ }
+ }
+
+ @Override
+ public TBrokerIsSplittableResponse isSplittable(TBrokerIsSplittableRequest
request) throws TException {
+ logger.info("received a isSplittable request, request detail: " +
request);
+ TBrokerIsSplittableResponse response = new
TBrokerIsSplittableResponse();
+ try {
+ boolean isSplittable = HiveUtils.isSplittable(request.path,
request.inputFormat, request.properties);
+ response.setOpStatus(generateOKStatus());
+ response.setSplittable(isSplittable);
+ return response;
+ } catch (BrokerException e) {
+ logger.warn("failed to get isSplitable with path: " +
request.path, e);
+ TBrokerOperationStatus errorStatus =
e.generateFailedOperationStatus();
+ response.setOpStatus(errorStatus);
+ return response;
+ }
+ }
+
@Override
public TBrokerOperationStatus deletePath(TBrokerDeletePathRequest request)
throws TException {
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
new file mode 100644
index 00000000000..f2211eb2026
--- /dev/null
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.broker.hdfs.BrokerException;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+public class HiveUtils {
+ private static final Logger logger =
Logger.getLogger(HiveUtils.class.getName());
+
+ public static boolean isSplittable(String path, String inputFormatName,
+ Map<String, String> properties) throws
BrokerException {
+ JobConf jobConf = getJobConf(properties);
+ InputFormat inputFormat = getInputFormat(jobConf, inputFormatName);
+ return isSplittableInternal(inputFormat, new Path(path), jobConf);
+ }
+
+ private static JobConf getJobConf(Map<String, String> properties) {
+ Configuration configuration = new Configuration();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return new JobConf(configuration);
+ }
+
+ private static InputFormat<?, ?> getInputFormat(JobConf jobConf, String
inputFormatName) throws BrokerException {
+ try {
+ Class<? extends InputFormat<?, ?>> inputFormatClass =
getInputFormatClass(jobConf, inputFormatName);
+ if (inputFormatClass == SymlinkTextInputFormat.class) {
+ // symlink targets are always TextInputFormat
+ inputFormatClass = TextInputFormat.class;
+ }
+
+ return ReflectionUtils.newInstance(inputFormatClass, jobConf);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ throw new
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "Unable to create input format " + inputFormatName, e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "RedundantCast"})
+ private static Class<? extends InputFormat<?, ?>>
getInputFormatClass(JobConf conf, String inputFormatName)
+ throws ClassNotFoundException {
+ // CDH uses different names for Parquet
+ if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
+ ||
"parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
+ return MapredParquetInputFormat.class;
+ }
+
+ Class<?> clazz = conf.getClassByName(inputFormatName);
+ return (Class<? extends InputFormat<?, ?>>)
clazz.asSubclass(InputFormat.class);
+ }
+
+ private static boolean isSplittableInternal(InputFormat<?, ?> inputFormat,
Path path, JobConf jobConf) {
+ // ORC uses a custom InputFormat but is always splittable
+ if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
+ return true;
+ }
+
+ // use reflection to get isSplittable method on FileInputFormat
+ Method method = null;
+ for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz =
clazz.getSuperclass()) {
+ try {
+ method = clazz.getDeclaredMethod("isSplitable",
FileSystem.class, Path.class);
+ break;
+ } catch (NoSuchMethodException ignored) {
+ logger.warn(LoggerMessageFormat.format("Class {} doesn't
contain isSplitable method", clazz));
+ }
+ }
+
+ if (method == null) {
+ return false;
+ }
+ try {
+ method.setAccessible(true);
+ return (boolean) method.invoke(inputFormat,
path.getFileSystem(jobConf), path);
+ } catch (InvocationTargetException | IllegalAccessException |
IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/gensrc/thrift/PaloBrokerService.thrift
b/gensrc/thrift/PaloBrokerService.thrift
index 308c606544e..e4bc60a201f 100644
--- a/gensrc/thrift/PaloBrokerService.thrift
+++ b/gensrc/thrift/PaloBrokerService.thrift
@@ -91,12 +91,25 @@ struct TBrokerCheckPathExistResponse {
2: required bool isPathExist;
}
+struct TBrokerIsSplittableResponse {
+ 1: optional TBrokerOperationStatus opStatus;
+ 2: optional bool splittable;
+}
+
struct TBrokerListPathRequest {
1: required TBrokerVersion version;
2: required string path;
3: required bool isRecursive;
4: required map<string,string> properties;
5: optional bool fileNameOnly;
+ 6: optional bool onlyFiles;
+}
+
+struct TBrokerIsSplittableRequest {
+ 1: optional TBrokerVersion version;
+ 2: optional string path;
+ 3: optional string inputFormat;
+ 4: optional map<string,string> properties;
}
struct TBrokerDeletePathRequest {
@@ -184,6 +197,13 @@ service TPaloBrokerService {
// return a list of files under a path
TBrokerListResponse listPath(1: TBrokerListPathRequest request);
+
+ // return located files of a given path. A broker implementation refers to
+ // 'org.apache.doris.fs.remote.RemoteFileSystem#listLocatedFiles' in
fe-core.
+ TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request);
+
+ // return whether the path with specified input format is splittable.
+ TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest
request);
// delete a file, if the deletion of the file fails, the status code will
return an error message
// input:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]