This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 0767fd8 [FLINK-20913][hive] Do not use HiveConf constructor to avoid
overriding properties with default values
0767fd8 is described below
commit 0767fd8d709adbc01328084f9b7099fce6d26846
Author: dixingxing <[email protected]>
AuthorDate: Thu Jan 21 11:05:02 2021 +0800
[FLINK-20913][hive] Do not use HiveConf constructor to avoid overriding
properties with default values
This closes #14707
Co-authored-by: dixingxing <[email protected]>
---
.../connectors/hive/HiveTableMetaStoreFactory.java | 4 +-
.../flink/connectors/hive/HiveTableSink.java | 4 +-
.../flink/connectors/hive/HiveTableSource.java | 4 +-
.../read/HiveContinuousMonitoringFunction.java | 4 +-
.../flink/connectors/hive/util/HiveConfUtils.java | 43 ++++++++++++++
.../flink/table/catalog/hive/HiveCatalog.java | 3 +-
.../connectors/hive/util/HiveConfUtilsTest.java | 68 ++++++++++++++++++++++
7 files changed, 121 insertions(+), 9 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
index 9dc5b04..c379b66 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableMetaStoreFactory.java
@@ -18,13 +18,13 @@
package org.apache.flink.connectors.hive;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.filesystem.TableMetaStoreFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -69,7 +69,7 @@ public class HiveTableMetaStoreFactory implements
TableMetaStoreFactory {
private HiveTableMetaStore() throws TException {
client =
HiveMetastoreClientFactory.create(
- new HiveConf(conf.conf(), HiveConf.class),
hiveVersion);
+ HiveConfUtils.create(conf.conf()), hiveVersion);
sd = client.getTable(database, tableName).getSd();
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index b99e789..735ebb1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -130,8 +131,7 @@ public class HiveTableSink
String dbName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
try (HiveMetastoreClientWrapper client =
- HiveMetastoreClientFactory.create(
- new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
HiveTableMetaStoreFactory msFactory =
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index fd34d7d..ee26d36 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,6 +27,7 @@ import
org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -440,8 +441,7 @@ public class HiveTableSource
// major
// refactoring. We will postpone this until we merge Blink to Flink.
try (HiveMetastoreClientWrapper client =
- HiveMetastoreClientFactory.create(
- new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
+
HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
List<String> partitionColNames = catalogTable.getPartitionKeys();
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index 0064bc6..3468496 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -27,6 +27,7 @@ import org.apache.flink.connectors.hive.ConsumeOrder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -182,8 +183,7 @@ public class HiveContinuousMonitoringFunction extends
RichSourceFunction<Timesta
new ListSerializer<>(
new
ListSerializer<>(StringSerializer.INSTANCE))));
- this.client =
- this.hiveShim.getHiveMetastoreClient(new HiveConf(conf.conf(),
HiveConf.class));
+ this.client =
this.hiveShim.getHiveMetastoreClient(HiveConfUtils.create(conf.conf()));
Table hiveTable = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
this.tableProps = HiveReflectionUtils.getTableMetadata(hiveShim,
hiveTable);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
new file mode 100644
index 0000000..ba81db4
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/util/HiveConfUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/** Utils to create HiveConf, see FLINK-20913 for more information. */
+public class HiveConfUtils {
+
+ /**
+ * Create HiveConf instance via Hadoop configuration. Since {@link
+ * HiveConf#HiveConf(org.apache.hadoop.conf.Configuration,
java.lang.Class)} will override
+ * properties in Hadoop configuration with Hive default values ({@link
org.apache
+ * .hadoop.hive.conf.HiveConf.ConfVars}), so we should use this method to
create HiveConf
+ * instance via Hadoop configuration.
+ *
+ * @param conf Hadoop configuration
+ * @return HiveConf instance
+ */
+ public static HiveConf create(Configuration conf) {
+ HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
+ // to make sure Hive configuration properties in conf not be overridden
+ hiveConf.addResource(conf);
+ return hiveConf;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1e931b8..9d54606 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.connectors.hive.HiveTableFactory;
+import org.apache.flink.connectors.hive.util.HiveConfUtils;
import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
@@ -223,7 +224,7 @@ public class HiveCatalog extends AbstractCatalog {
break;
}
}
- return new HiveConf(hadoopConf, HiveConf.class);
+ return HiveConfUtils.create(hadoopConf);
}
@VisibleForTesting
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
new file mode 100644
index 0000000..bc13505
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/util/HiveConfUtilsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/** Test for {@link HiveConfUtils}. */
+public class HiveConfUtilsTest {
+ private static final String HIVE_SITE_CONTENT =
+ "<?xml version=\"1.0\"?>\n"
+ + "<?xml-stylesheet type=\"text/xsl\"
href=\"configuration.xsl\"?>\n"
+ + "<configuration>\n"
+ + " <property>\n"
+ + " <name>hive.metastore.sasl.enabled</name>\n"
+ + " <value>true</value>\n"
+ + " </property>\n"
+ + "</configuration>\n";
+
+ @Test
+ public void testCreateHiveConf() {
+ HiveConf hiveConf = createHiveConf();
+
Assert.assertTrue(hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+ // will override configurations from `hiveConf` with hive default
values which default value
+ // is null or empty string
+ Assert.assertFalse(
+ new HiveConf(hiveConf, HiveConf.class)
+
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+
+ Assert.assertTrue(
+ HiveConfUtils.create(hiveConf)
+
.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL));
+ }
+
+ private HiveConf createHiveConf() {
+ HiveConf hiveConf = new HiveConf();
+ try (InputStream inputStream =
+ new
ByteArrayInputStream(HIVE_SITE_CONTENT.getBytes(StandardCharsets.UTF_8))) {
+ hiveConf.addResource(inputStream, "for_test");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return hiveConf;
+ }
+}