This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new fb93fdb [FLINK-20913][hive] Do not use HiveConf constructor to avoid
overriding properties with default values
fb93fdb is described below
commit fb93fdbe4eaa8149ef4f0e61a94d9d9a32fc5306
Author: dixingxing <[email protected]>
AuthorDate: Thu Jan 21 11:04:21 2021 +0800
[FLINK-20913][hive] Do not use HiveConf constructor to avoid overriding
properties with default values
This closes #14706
Co-authored-by: dixingxing <[email protected]>
---
.../connectors/hive/HiveTableMetaStoreFactory.java | 4 +-
.../flink/connectors/hive/HiveTableSink.java | 4 +-
.../hive/read/HivePartitionFetcherContextBase.java | 5 +-
.../flink/connectors/hive/util/HiveConfUtils.java | 43 ++++++++++++++
.../connectors/hive/util/HivePartitionUtils.java | 3 +-
.../connectors/hive/util/HiveConfUtilsTest.java | 68 ++++++++++++++++++++++
6 files changed, 118 insertions(+), 9 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
index 9dc5b04..c379b66 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
@@ -18,13 +18,13 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -69,7 +69,7 @@ public class HiveTableMetaStoreFactory implements
TableMetaStoreFactory {
private HiveTableMetaStore() throws TException {
client =
HiveMetastoreClientFactory.create(
- new HiveConf(conf.conf(), HiveConf.class),
hiveVersion);
+ HiveConfUtils.create(conf.conf()), hiveVersion);
sd = client.getTable(database, tableName).getSd();
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index af53091..29a4f2c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -142,8 +143,7 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
checkAcidTable(catalogTable, identifier.toObjectPath());
try (HiveMetastoreClientWrapper client =
- HiveMetastoreClientFactory.create(
- new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
Table table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
StorageDescriptor sd = table.getSd();
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
index d49dace..3e67de1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.connectors.hive.read;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -33,7 +34,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -99,8 +99,7 @@ public abstract class HivePartitionFetcherContextBase<P>
implements PartitionFet
@Override
public void open() throws Exception {
- metaStoreClient =
- hiveShim.getHiveMetastoreClient(new
HiveConf(confWrapper.conf(), HiveConf.class));
+ metaStoreClient =
hiveShim.getHiveMetastoreClient(HiveConfUtils.create(confWrapper.conf()));
table = metaStoreClient.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
tableSd = table.getSd();
tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
new file mode 100644
index 0000000..ba81db4
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/** Utils to create HiveConf, see FLINK-20913 for more information. */
+public class HiveConfUtils {
+
+ /**
+ * Create HiveConf instance via Hadoop configuration. Since {@link
+ * HiveConf#HiveConf(org.apache.hadoop.conf.Configuration,
java.lang.Class)} will override
+ * properties in Hadoop configuration with Hive default values ({@link
org.apache
+ * .hadoop.hive.conf.HiveConf.ConfVars}), so we should use this method to
create HiveConf
+ * instance via Hadoop configuration.
+ *
+ * @param conf Hadoop configuration
+ * @return HiveConf instance
+ */
+ public static HiveConf create(Configuration conf) {
+ HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
+ // to make sure Hive configuration properties in conf not be overridden
+ hiveConf.addResource(conf);
+ return hiveConf;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
index 80d7ec3..48990ea 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
@@ -111,8 +111,7 @@ public class HivePartitionUtils {
List<Map<String, String>> remainingPartitions) {
List<HiveTablePartition> allHivePartitions = new ArrayList<>();
try (HiveMetastoreClientWrapper client =
- HiveMetastoreClientFactory.create(
- new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
List<String> partitionColNames = catalogTable.getPartitionKeys();
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
new file mode 100644
index 0000000..bc13505
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Test for {@link HiveConfUtils}. */
+public class HiveConfUtilsTest {
+ private static final String HIVE_SITE_CONTENT =
+ "<?xml version=\"1.0\"?>\n"
+ + "<?xml-stylesheet type=\"text/xsl\"
href=\"configuration.xsl\"?>\n"
+ + "<configuration>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.sasl.enabled</name>\n"
+ + " <value>true</value>\n"
+ + " </property>\n"
+ + "</configuration>\n";
+
+ @Test
+ public void testCreateHiveConf() {
+ HiveConf hiveConf = createHiveConf();
+
Assert.assertTrue(hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+ // will override configurations from `hiveConf` with hive default
values which default value
+ // is null or empty string
+ Assert.assertFalse(
+ new HiveConf(hiveConf, HiveConf.class)
+
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+ Assert.assertTrue(
+ HiveConfUtils.create(hiveConf)
+
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+ }
+
+ private HiveConf createHiveConf() {
+ HiveConf hiveConf = new HiveConf();
+ try (InputStream inputStream =
+ new
ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) {
+ hiveConf.addResource(inputStream, "for_test");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return hiveConf;
+ }
+}