This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 0767fd8  [FLINK-20913][hive] Do not use HiveConf constructor to avoid 
overriding properties with default values
0767fd8 is described below

commit 0767fd8d709adbc01328084f9b7099fce6d26846
Author: dixingxing <[email protected]>
AuthorDate: Thu Jan 21 11:05:02 2021 +0800

    [FLINK-20913][hive] Do not use HiveConf constructor to avoid overriding 
properties with default values
    
    This closes #14707
    
    Co-authored-by: dixingxing <[email protected]>
---
 .../connectors/hive/HiveTableMetaStoreFactory.java |  4 +-
 .../flink/connectors/hive/HiveTableSink.java       |  4 +-
 .../flink/connectors/hive/HiveTableSource.java     |  4 +-
 .../read/HiveContinuousMonitoringFunction.java     |  4 +-
 .../flink/connectors/hive/util/HiveConfUtils.java  | 43 ++++++++++++++
 .../flink/table/catalog/hive/HiveCatalog.java      |  3 +-
 .../connectors/hive/util/HiveConfUtilsTest.java    | 68 ++++++++++++++++++++++
 7 files changed, 121 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
index 9dc5b04..c379b66 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.filesystem.TableMetaStoreFactory;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -69,7 +69,7 @@ public class HiveTableMetaStoreFactory implements 
TableMetaStoreFactory {
         private HiveTableMetaStore() throws TException {
             client =
                     HiveMetastoreClientFactory.create(
-                            new HiveConf(conf.conf(), HiveConf.class), 
hiveVersion);
+                            HiveConfUtils.create(conf.conf()), hiveVersion);
             sd = client.getTable(database, tableName).getSd();
         }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index b99e789..735ebb1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
 import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
 import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -130,8 +131,7 @@ public class HiveTableSink
         String dbName = identifier.getDatabaseName();
         String tableName = identifier.getObjectName();
         try (HiveMetastoreClientWrapper client =
-                HiveMetastoreClientFactory.create(
-                        new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+                
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
             Table table = client.getTable(dbName, tableName);
             StorageDescriptor sd = table.getSd();
             HiveTableMetaStoreFactory msFactory =
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index fd34d7d..ee26d36 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
 import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
 import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
 import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -440,8 +441,7 @@ public class HiveTableSource
         // major
         // refactoring. We will postpone this until we merge Blink to Flink.
         try (HiveMetastoreClientWrapper client =
-                HiveMetastoreClientFactory.create(
-                        new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+                
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
             String dbName = tablePath.getDatabaseName();
             String tableName = tablePath.getObjectName();
             List<String> partitionColNames = catalogTable.getPartitionKeys();
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index 0064bc6..3468496 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -27,6 +27,7 @@ import org.apache.flink.connectors.hive.ConsumeOrder;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.connectors.hive.HiveTableSource;
 import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -182,8 +183,7 @@ public class HiveContinuousMonitoringFunction extends 
RichSourceFunction<Timesta
                                         new ListSerializer<>(
                                                 new 
ListSerializer<>(StringSerializer.INSTANCE))));
 
-        this.client =
-                this.hiveShim.getHiveMetastoreClient(new HiveConf(conf.conf(), 
HiveConf.class));
+        this.client = 
this.hiveShim.getHiveMetastoreClient(HiveConfUtils.create(conf.conf()));
 
         Table hiveTable = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
         this.tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, 
hiveTable);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
new file mode 100644
index 0000000..ba81db4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/** Utils to create HiveConf, see FLINK-20913 for more information. */
+public class HiveConfUtils {
+
+    /**
+     * Create HiveConf instance via Hadoop configuration. Since {@link
+     * HiveConf#HiveConf(org.apache.hadoop.conf.Configuration, 
java.lang.Class)} will override
+     * properties in Hadoop configuration with Hive default values ({@link 
org.apache
+     * .hadoop.hive.conf.HiveConf.ConfVars}), so we should use this method to 
create HiveConf
+     * instance via Hadoop configuration.
+     *
+     * @param conf Hadoop configuration
+     * @return HiveConf instance
+     */
+    public static HiveConf create(Configuration conf) {
+        HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
+        // to make sure Hive configuration properties in conf not be overridden
+        hiveConf.addResource(conf);
+        return hiveConf;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1e931b8..9d54606 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
 import org.apache.flink.connectors.hive.HiveTableFactory;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
@@ -223,7 +224,7 @@ public class HiveCatalog extends AbstractCatalog {
                 break;
             }
         }
-        return new HiveConf(hadoopConf, HiveConf.class);
+        return HiveConfUtils.create(hadoopConf);
     }
 
     @VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
new file mode 100644
index 0000000..bc13505
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Test for {@link HiveConfUtils}. */
+public class HiveConfUtilsTest {
+    private static final String HIVE_SITE_CONTENT =
+            "<?xml version=\"1.0\"?>\n"
+                    + "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n"
+                    + "<configuration>\n"
+                    + "  <property>\n"
+                    + "    <name>hive.metastore.sasl.enabled</name>\n"
+                    + "    <value>true</value>\n"
+                    + "  </property>\n"
+                    + "</configuration>\n";
+
+    @Test
+    public void testCreateHiveConf() {
+        HiveConf hiveConf = createHiveConf();
+        
Assert.assertTrue(hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+        // will override configurations from `hiveConf` with hive default 
values which default value
+        // is null or empty string
+        Assert.assertFalse(
+                new HiveConf(hiveConf, HiveConf.class)
+                        
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+        Assert.assertTrue(
+                HiveConfUtils.create(hiveConf)
+                        
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+    }
+
+    private HiveConf createHiveConf() {
+        HiveConf hiveConf = new HiveConf();
+        try (InputStream inputStream =
+                new 
ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) {
+            hiveConf.addResource(inputStream, "for_test");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hiveConf;
+    }
+}

Reply via email to