This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c581855a41 [fix](hive-table) fix bug that hive external table can not
query table created by Tez (#11345)
c581855a41 is described below
commit c581855a410343230bbfe6b259c6241774dc2f37
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Aug 3 09:07:47 2022 +0800
[fix](hive-table) fix bug that hive external table can not query table
created by Tez (#11345)
* [fix](hive-table) fix bug that hive external table can not query table
created by Tez
If the hive is created by Tez, the location of the table is a second-level
director, eg:
/user/hive/warehouse/region_tmp_union_all/
---/user/hive/warehouse/region_tmp_union_all/1
---/user/hive/warehouse/region_tmp_union_all/2
We should recursive traverse the directory to get the real files.
---
.../doris/catalog/HiveMetaStoreClientHelper.java | 163 ++++++++++++---------
.../planner/external/ExternalFileScanNode.java | 4 +-
.../planner/external/ExternalHiveScanProvider.java | 49 ++++---
3 files changed, 125 insertions(+), 91 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index a364568ed7..ac15875f0e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -37,10 +37,12 @@ import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -74,6 +76,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -171,6 +174,7 @@ public class HiveMetaStoreClientHelper {
/**
* Get data files of partitions in hive table, filter by partition
predicate.
+ *
* @param hiveTable
* @param hivePartitionPredicate
* @param fileStatuses
@@ -179,25 +183,108 @@ public class HiveMetaStoreClientHelper {
* @throws DdlException
*/
public static String getHiveDataFiles(HiveTable hiveTable,
ExprNodeGenericFuncDesc hivePartitionPredicate,
- List<TBrokerFileStatus> fileStatuses,
- Table remoteHiveTbl,
StorageBackend.StorageType type) throws DdlException {
+ List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl,
StorageBackend.StorageType type)
+ throws DdlException {
+ boolean onS3 = type.equals(StorageBackend.StorageType.S3);
+ Map<String, String> properties = hiveTable.getHiveProperties();
+ Configuration configuration = getConfiguration(properties, onS3);
+ boolean isSecurityEnabled = isSecurityEnabled(properties);
+
List<RemoteIterator<LocatedFileStatus>> remoteIterators;
- Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
String metaStoreUris =
hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition
sd info
List<Partition> hivePartitions = getHivePartitions(metaStoreUris,
remoteHiveTbl, hivePartitionPredicate);
- remoteIterators = getRemoteIterator(hivePartitions,
hiveTable.getHiveProperties(), onS3);
+ remoteIterators = getRemoteIterator(hivePartitions, configuration,
isSecurityEnabled, properties, onS3);
} else {
// hive non-partitioned table, get file iterator from table sd info
- remoteIterators = getRemoteIterator(remoteHiveTbl,
hiveTable.getHiveProperties(), onS3);
+ remoteIterators = getRemoteIterator(remoteHiveTbl, configuration,
isSecurityEnabled, properties, onS3);
+ }
+ return getAllFileStatus(fileStatuses, remoteIterators, configuration,
isSecurityEnabled, properties, onS3);
+ }
+
+ // create Configuration for the given properties
+ private static Configuration getConfiguration(Map<String, String>
properties, boolean onS3) {
+ Configuration configuration = new Configuration(false);
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ }
+ if (onS3) {
+ setS3Configuration(configuration, properties);
+ }
+ return configuration;
+ }
+
+ // return true if it is kerberos
+ private static boolean isSecurityEnabled(Map<String, String> properties) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) &&
entry.getValue()
+ .equals(AuthType.KERBEROS.getDesc())) {
+ return true;
+ }
}
+ return false;
+ }
+ // Get remote iterators for given partitions
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(List<Partition> partitions,
+ Configuration configuration, boolean isSecurityEnabled,
Map<String, String> properties, boolean onS3)
+ throws DdlException {
+ List<RemoteIterator<LocatedFileStatus>> allIterators = new
ArrayList<>();
+ for (Partition p : partitions) {
+ String location = p.getSd().getLocation();
+ Path path = new Path(location);
+ allIterators.addAll(getRemoteIterator(path, configuration,
properties, isSecurityEnabled));
+ }
+ return allIterators;
+ }
+
+ // Get remote iterators for given table
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(Table table, Configuration configuration,
+ boolean isSecurityEnabled, Map<String, String> properties, boolean
onS3) throws DdlException {
+ String location = table.getSd().getLocation();
+ Path path = new Path(location);
+ return getRemoteIterator(path, configuration, properties,
isSecurityEnabled);
+ }
+
+ // Get remote iterators for given Path
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(org.apache.hadoop.fs.Path path,
+ Configuration conf, Map<String, String> properties, boolean
isSecurityEnabled) throws DdlException {
+ List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
+ try {
+ if (isSecurityEnabled) {
+ UserGroupInformation.setConfiguration(conf);
+ // login user from keytab
+
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+ properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+ }
+ FileSystem fileSystem = path.getFileSystem(conf);
+ iterators.add(fileSystem.listLocatedStatus(path));
+ } catch (IOException e) {
+ LOG.warn("Get HDFS file remote iterator failed. {}" +
e.getMessage());
+ throw new DdlException("Get HDFS file remote iterator failed.
Error: " + e.getMessage());
+ }
+ return iterators;
+ }
+
+ private static String getAllFileStatus(List<TBrokerFileStatus>
fileStatuses,
+ List<RemoteIterator<LocatedFileStatus>> remoteIterators,
Configuration configuration,
+ boolean isSecurityEnabled, Map<String, String> properties, boolean
onS3) throws DdlException {
String hdfsUrl = "";
- for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) {
+ Queue<RemoteIterator<LocatedFileStatus>> queue =
Queues.newArrayDeque(remoteIterators);
+ while (queue.peek() != null) {
+ RemoteIterator<LocatedFileStatus> iterator = queue.poll();
try {
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
+ if (fileStatus.isDirectory()) {
+ // recursive visit the directory to get the file path.
+ queue.addAll(
+ getRemoteIterator(fileStatus.getPath(),
configuration, properties, isSecurityEnabled));
+ continue;
+ }
TBrokerFileStatus brokerFileStatus = new
TBrokerFileStatus();
brokerFileStatus.setIsDir(fileStatus.isDirectory());
brokerFileStatus.setIsSplitable(true);
@@ -226,7 +313,6 @@ public class HiveMetaStoreClientHelper {
throw new DdlException("List HDFS file failed. Error: " +
e.getMessage());
}
}
-
return hdfsUrl;
}
@@ -271,69 +357,6 @@ public class HiveMetaStoreClientHelper {
configuration.set("fs.s3a.attempts.maximum", "2");
}
- private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
- List<Partition> partitions, Map<String, String> properties,
boolean onS3)
- throws DdlException {
- List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- }
- if (onS3) {
- setS3Configuration(configuration, properties);
- }
- for (Partition p : partitions) {
- String location = p.getSd().getLocation();
- org.apache.hadoop.fs.Path path = new
org.apache.hadoop.fs.Path(location);
- try {
- FileSystem fileSystem = path.getFileSystem(configuration);
- iterators.add(fileSystem.listLocatedStatus(path));
- } catch (IOException e) {
- LOG.warn("Get HDFS file remote iterator failed. {}",
e.getMessage());
- throw new DdlException("Get HDFS file remote iterator failed.
Error: " + e.getMessage());
- }
- }
- return iterators;
- }
-
- private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
- Table table, Map<String, String> properties, boolean onS3)
- throws DdlException {
- List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
- Configuration configuration = new Configuration(false);
- boolean isSecurityEnabled = false;
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- if
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
- && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
- isSecurityEnabled = true;
- }
- }
- if (onS3) {
- setS3Configuration(configuration, properties);
- }
- String location = table.getSd().getLocation();
- org.apache.hadoop.fs.Path path = new
org.apache.hadoop.fs.Path(location);
- try {
- if (isSecurityEnabled) {
- UserGroupInformation.setConfiguration(configuration);
- // login user from keytab
-
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
- properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
- }
- FileSystem fileSystem = path.getFileSystem(configuration);
- iterators.add(fileSystem.listLocatedStatus(path));
- } catch (IOException e) {
- LOG.warn("Get HDFS file remote iterator failed. {}" +
e.getMessage());
- throw new DdlException("Get HDFS file remote iterator failed.
Error: " + e.getMessage());
- }
- return iterators;
- }
-
public static List<String> getPartitionNames(HiveTable hiveTable) throws
DdlException {
HiveMetaStoreClient client =
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
List<String> partitionNames = new ArrayList<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 6702f689cb..1ae05ff630 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -272,8 +272,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
try {
buildScanRange();
} catch (IOException e) {
- LOG.error("Finalize failed.", e);
- throw new UserException("Finalize failed.", e);
+ LOG.warn("Finalize failed.", e);
+ throw new UserException("Finalize failed: " + e.getMessage());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index 39ac68773a..eb39a736a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,6 +52,8 @@ import java.util.stream.Collectors;
* A HiveScanProvider to get information for scan node.
*/
public class ExternalHiveScanProvider implements ExternalFileScanProvider {
+ private static final Logger LOG =
LogManager.getLogger(ExternalHiveScanProvider.class);
+
protected HMSExternalTable hmsTable;
public ExternalHiveScanProvider(HMSExternalTable hmsTable) {
@@ -112,33 +116,40 @@ public class ExternalHiveScanProvider implements
ExternalFileScanProvider {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration,
inputFormatName, false);
List<InputSplit> splits;
if (!hivePartitions.isEmpty()) {
- splits = hivePartitions.stream()
- .flatMap(x -> getSplitsByPath(inputFormat, configuration,
x.getSd().getLocation()).stream())
- .collect(Collectors.toList());
+ try {
+ splits = hivePartitions.stream().flatMap(x -> {
+ try {
+ return getSplitsByPath(inputFormat, configuration,
x.getSd().getLocation()).stream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ } catch (RuntimeException e) {
+ throw new IOException(e);
+ }
} else {
splits = getSplitsByPath(inputFormat, configuration, splitsPath);
}
- return HiveBucketUtil.getPrunedSplitsByBuckets(
- splits,
- hmsTable.getName(),
- exprs,
- getRemoteHiveTable().getSd().getBucketCols(),
- getRemoteHiveTable().getSd().getNumBuckets(),
+ return HiveBucketUtil.getPrunedSplitsByBuckets(splits,
hmsTable.getName(), exprs,
+ getRemoteHiveTable().getSd().getBucketCols(),
getRemoteHiveTable().getSd().getNumBuckets(),
getRemoteHiveTable().getParameters());
}
- private List<InputSplit> getSplitsByPath(
- InputFormat<?, ?> inputFormat,
- Configuration configuration,
- String splitsPath) {
+ private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat,
Configuration configuration,
+ String splitsPath) throws IOException {
JobConf jobConf = new JobConf(configuration);
+ // For Tez engine, it may generate subdirectoies for "union" query.
+ // So there may be files and directories in the table directory at the
same time. eg:
+ // /user/hive/warehouse/region_tmp_union_all2/000000_0
+ // /user/hive/warehouse/region_tmp_union_all2/1
+ // /user/hive/warehouse/region_tmp_union_all2/2
+ // So we need to set this config to support visit dir recursively.
+ // Otherwise, getSplits() may throw exception: "Not a file xxx"
+ //
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
+ jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive",
"true");
FileInputFormat.setInputPaths(jobConf, splitsPath);
- try {
- InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
- return Lists.newArrayList(splits);
- } catch (IOException e) {
- return new ArrayList<InputSplit>();
- }
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+ return Lists.newArrayList(splits);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]