This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 4b8d35c  [FLINK-13424][hive] HiveCatalog should add hive version in 
conf
4b8d35c is described below

commit 4b8d35c7f9eb6573e3b099fd2ca8d4ca42ccae72
Author: Rui Li <[email protected]>
AuthorDate: Fri Jul 26 11:16:49 2019 +0800

    [FLINK-13424][hive] HiveCatalog should add hive version in conf
    
    To avoid overriding the hive version users specify in the yaml file.
    
    This closes #9232.
---
 .../java/org/apache/flink/connectors/hive/HiveTableFactory.java     | 5 +++--
 .../org/apache/flink/connectors/hive/HiveTableOutputFormat.java     | 3 ++-
 .../main/java/org/apache/flink/connectors/hive/HiveTableSink.java   | 4 ++--
 .../main/java/org/apache/flink/connectors/hive/HiveTableSource.java | 4 ++--
 .../main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java  | 3 +++
 .../flink/table/catalog/hive/client/HiveMetastoreClientFactory.java | 6 +++---
 .../java/org/apache/flink/connectors/hive/HiveInputFormatTest.java  | 5 +++--
 .../org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java | 3 ++-
 8 files changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 4127bf3..73626d0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -76,7 +75,9 @@ public class HiveTableFactory
        public HiveTableFactory(HiveConf hiveConf) {
                this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
-               this.hiveVersion = new 
JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               // this has to come from hiveConf, otherwise we may lose what 
user specifies in the yaml file
+               this.hiveVersion = 
checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                               "Hive version is not defined");
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 52436cc..e4caac1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -163,7 +163,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                this.overwrite = overwrite;
                isPartitioned = partitionColumns != null && 
!partitionColumns.isEmpty();
                isDynamicPartition = isPartitioned && partitionColumns.size() > 
hiveTablePartition.getPartitionSpec().size();
-               hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                               "Hive version is not defined");
        }
 
        //  Custom serialization methods
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 7d8a85a..a36479d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
@@ -76,7 +75,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row> 
implements Partiti
                this.jobConf = jobConf;
                this.tablePath = tablePath;
                this.catalogTable = table;
-               hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                               "Hive version is not defined");
                TableSchema tableSchema = table.getSchema();
                rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 2df10c5..a5a670e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.PartitionableTableSource;
@@ -72,7 +71,8 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> implements Part
                this.jobConf = Preconditions.checkNotNull(jobConf);
                this.tablePath = Preconditions.checkNotNull(tablePath);
                this.catalogTable = Preconditions.checkNotNull(catalogTable);
-               this.hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               this.hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+                               "Hive version is not defined");
                initAllPartitions = false;
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7036196..ba0921c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -135,6 +136,8 @@ public class HiveCatalog extends AbstractCatalog {
                this.hiveConf = hiveConf == null ? createHiveConf(null) : 
hiveConf;
                checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), 
"hiveVersion cannot be null or empty");
                this.hiveVersion = hiveVersion;
+               // add this to hiveConf to make sure table factory and 
source/sink see the same Hive version as HiveCatalog
+               this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
hiveVersion);
 
                LOG.info("Created HiveCatalog '{}'", catalogName);
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
index ccdca1b..dc4c6da 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.client;
 
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -31,7 +31,7 @@ public class HiveMetastoreClientFactory {
        }
 
        public static HiveMetastoreClientWrapper create(HiveConf hiveConf, 
String hiveVersion) {
-               return new HiveMetastoreClientWrapper(hiveConf,
-                               StringUtils.isNullOrWhitespaceOnly(hiveVersion) 
? HiveShimLoader.getHiveVersion() : hiveVersion);
+               Preconditions.checkNotNull(hiveVersion, "Hive version cannot be 
null");
+               return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
index 19d6da2..b9a1527 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -92,7 +93,7 @@ public class HiveInputFormatTest {
                );
                //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
                //serDe temporarily.
-               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
+               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
                org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
                tbl.setDbName(dbName);
                tbl.setTableName(tblName);
@@ -142,7 +143,7 @@ public class HiveInputFormatTest {
 
                //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
                //serDe temporarily.
-               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
+               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
                org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
                tbl.setDbName(dbName);
                tbl.setTableName(tblName);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index aa02eb6..4ac76cd 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -56,7 +57,7 @@ public class TableEnvHiveConnectorTest {
                HiveConf hiveConf = hiveShell.getHiveConf();
                hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
                hiveCatalog.open();
-               hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+               hmsClient = HiveMetastoreClientFactory.create(hiveConf, 
HiveShimLoader.getHiveVersion());
        }
 
        @Test

Reply via email to