This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new fb93fdb  [FLINK-20913][hive] Do not use HiveConf constructor to avoid 
overriding properties with default values
fb93fdb is described below

commit fb93fdbe4eaa8149ef4f0e61a94d9d9a32fc5306
Author: dixingxing <[email protected]>
AuthorDate: Thu Jan 21 11:04:21 2021 +0800

    [FLINK-20913][hive] Do not use HiveConf constructor to avoid overriding 
properties with default values
    
    This closes #14706
    
    Co-authored-by: dixingxing <[email protected]>
---
 .../connectors/hive/HiveTableMetaStoreFactory.java |  4 +-
 .../flink/connectors/hive/HiveTableSink.java       |  4 +-
 .../hive/read/HivePartitionFetcherContextBase.java |  5 +-
 .../flink/connectors/hive/util/HiveConfUtils.java  | 43 ++++++++++++++
 .../connectors/hive/util/HivePartitionUtils.java   |  3 +-
 .../connectors/hive/util/HiveConfUtilsTest.java    | 68 ++++++++++++++++++++++
 6 files changed, 118 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
index 9dc5b04..c379b66 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.filesystem.TableMetaStoreFactory;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -69,7 +69,7 @@ public class HiveTableMetaStoreFactory implements 
TableMetaStoreFactory {
         private HiveTableMetaStore() throws TException {
             client =
                     HiveMetastoreClientFactory.create(
-                            new HiveConf(conf.conf(), HiveConf.class), 
hiveVersion);
+                            HiveConfUtils.create(conf.conf()), hiveVersion);
             sd = client.getTable(database, tableName).getSd();
         }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index af53091..29a4f2c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
 import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
 import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -142,8 +143,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         checkAcidTable(catalogTable, identifier.toObjectPath());
 
         try (HiveMetastoreClientWrapper client =
-                HiveMetastoreClientFactory.create(
-                        new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+                
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
             Table table = client.getTable(identifier.getDatabaseName(), 
identifier.getObjectName());
             StorageDescriptor sd = table.getSd();
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
index d49dace..3e67de1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.connectors.hive.read;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connectors.hive.ConsumeOrder;
 import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -33,7 +34,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -99,8 +99,7 @@ public abstract class HivePartitionFetcherContextBase<P> 
implements PartitionFet
 
     @Override
     public void open() throws Exception {
-        metaStoreClient =
-                hiveShim.getHiveMetastoreClient(new 
HiveConf(confWrapper.conf(), HiveConf.class));
+        metaStoreClient = 
hiveShim.getHiveMetastoreClient(HiveConfUtils.create(confWrapper.conf()));
         table = metaStoreClient.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
         tableSd = table.getSd();
         tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
new file mode 100644
index 0000000..ba81db4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/** Utils to create HiveConf, see FLINK-20913 for more information. */
+public class HiveConfUtils {
+
+    /**
+     * Create HiveConf instance via Hadoop configuration. Since {@link
+     * HiveConf#HiveConf(org.apache.hadoop.conf.Configuration, 
java.lang.Class)} will override
+     * properties in Hadoop configuration with Hive default values ({@link 
org.apache
+     * .hadoop.hive.conf.HiveConf.ConfVars}), so we should use this method to 
create HiveConf
+     * instance via Hadoop configuration.
+     *
+     * @param conf Hadoop configuration
+     * @return HiveConf instance
+     */
+    public static HiveConf create(Configuration conf) {
+        HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
+        // to make sure Hive configuration properties in conf not be overridden
+        hiveConf.addResource(conf);
+        return hiveConf;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
index 80d7ec3..48990ea 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HivePartitionUtils.java
@@ -111,8 +111,7 @@ public class HivePartitionUtils {
             List<Map<String, String>> remainingPartitions) {
         List<HiveTablePartition> allHivePartitions = new ArrayList<>();
         try (HiveMetastoreClientWrapper client =
-                HiveMetastoreClientFactory.create(
-                        new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+                
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
             String dbName = tablePath.getDatabaseName();
             String tableName = tablePath.getObjectName();
             List<String> partitionColNames = catalogTable.getPartitionKeys();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
new file mode 100644
index 0000000..bc13505
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Test for {@link HiveConfUtils}. */
+public class HiveConfUtilsTest {
+    private static final String HIVE_SITE_CONTENT =
+            "<?xml version=\"1.0\"?>\n"
+                    + "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n"
+                    + "<configuration>\n"
+                    + "  <property>\n"
+                    + "    <name>hive.metastore.sasl.enabled</name>\n"
+                    + "    <value>true</value>\n"
+                    + "  </property>\n"
+                    + "</configuration>\n";
+
+    @Test
+    public void testCreateHiveConf() {
+        HiveConf hiveConf = createHiveConf();
+        
Assert.assertTrue(hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+        // will override configurations from `hiveConf` with hive default 
values which default value
+        // is null or empty string
+        Assert.assertFalse(
+                new HiveConf(hiveConf, HiveConf.class)
+                        
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+        Assert.assertTrue(
+                HiveConfUtils.create(hiveConf)
+                        
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+    }
+
+    private HiveConf createHiveConf() {
+        HiveConf hiveConf = new HiveConf();
+        try (InputStream inputStream =
+                new 
ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) {
+            hiveConf.addResource(inputStream, "for_test");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hiveConf;
+    }
+}

Reply via email to