This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c86025f [FLINK-21829][hive] Fix exception when custom hadoop conf
path does not exist or there is no conf file
c86025f is described below
commit c86025f0a0c4c8a4d51a0298081d4a93db09e130
Author: hehuiyuan <[email protected]>
AuthorDate: Tue Mar 30 15:15:04 2021 +0800
[FLINK-21829][hive] Fix exception when custom hadoop conf path does not
exist or there is no conf file
This closes #15427
---
.../flink/table/catalog/hive/HiveCatalog.java | 11 ++++++++++
.../table/catalog/hive/util/HiveTableUtil.java | 20 +++++++++++------
.../hive/factories/HiveCatalogFactoryTest.java | 25 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 9cd7568..7f0655d 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -103,6 +103,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -223,6 +224,16 @@ public class HiveCatalog extends AbstractCatalog {
}
} else {
hadoopConf = getHadoopConfiguration(hadoopConfDir);
+ if (hadoopConf == null) {
+ String possiableUsedConfFiles =
+ "core-site.xml | hdfs-site.xml | yarn-site.xml |
mapred-site.xml";
+ throw new CatalogException(
+ "Failed to load the hadoop conf from specified path:"
+ hadoopConfDir,
+ new FileNotFoundException(
+ "Please check the path none of the conf files
("
+ + possiableUsedConfFiles
+ + ") exist in the folder."));
+ }
}
if (hadoopConf == null) {
hadoopConf = new Configuration();
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d5f725a..8f311c8 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -454,25 +454,33 @@ public class HiveTableUtil {
*/
public static Configuration getHadoopConfiguration(String hadoopConfDir) {
if (new File(hadoopConfDir).exists()) {
- Configuration hadoopConfiguration = new Configuration();
+ List<File> possiableConfFiles = new ArrayList<File>();
File coreSite = new File(hadoopConfDir, "core-site.xml");
if (coreSite.exists()) {
- hadoopConfiguration.addResource(new
Path(coreSite.getAbsolutePath()));
+ possiableConfFiles.add(coreSite);
}
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
if (hdfsSite.exists()) {
- hadoopConfiguration.addResource(new
Path(hdfsSite.getAbsolutePath()));
+ possiableConfFiles.add(hdfsSite);
}
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
if (yarnSite.exists()) {
- hadoopConfiguration.addResource(new
Path(yarnSite.getAbsolutePath()));
+ possiableConfFiles.add(yarnSite);
}
// Add mapred-site.xml. We need to read configurations like
compression codec.
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
if (mapredSite.exists()) {
- hadoopConfiguration.addResource(new
Path(mapredSite.getAbsolutePath()));
+ possiableConfFiles.add(mapredSite);
+ }
+ if (possiableConfFiles.isEmpty()) {
+ return null;
+ } else {
+ Configuration hadoopConfiguration = new Configuration();
+ for (File confFile : possiableConfFiles) {
+ hadoopConfiguration.addResource(new
Path(confFile.getAbsolutePath()));
+ }
+ return hadoopConfiguration;
}
- return hadoopConfiguration;
}
return null;
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index c6ce2a3..ac61c8e 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -109,6 +110,30 @@ public class HiveCatalogFactoryTest extends TestLogger {
}
@Test
+ public void testCreateHiveCatalogWithIllegalHadoopConfDir() throws
IOException {
+ final String catalogName = "mycatalog";
+
+ final String hadoopConfDir = tempFolder.newFolder().getAbsolutePath();
+
+ try {
+ final Map<String, String> options = new HashMap<>();
+ options.put(
+ CommonCatalogOptions.CATALOG_TYPE.key(),
HiveCatalogFactoryOptions.IDENTIFIER);
+ options.put(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key(),
CONF_DIR.getPath());
+ options.put(HiveCatalogFactoryOptions.HADOOP_CONF_DIR.key(),
hadoopConfDir);
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ catalogName,
+ options,
+ null,
+ Thread.currentThread().getContextClassLoader());
+ Assert.fail();
+ } catch (ValidationException e) {
+ }
+ }
+
+ @Test
public void testLoadHadoopConfigFromEnv() throws IOException {
Map<String, String> customProps = new HashMap<>();
String k1 = "what is connector?";