This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b20ac47 [FLINK-13424][hive] HiveCatalog should add hive version in
conf
b20ac47 is described below
commit b20ac47258228aed51be0d0e6fc958ac303ce4a1
Author: Rui Li <[email protected]>
AuthorDate: Fri Jul 26 11:16:49 2019 +0800
[FLINK-13424][hive] HiveCatalog should add hive version in conf
To avoid overriding the hive version users specify in the yaml file.
This closes #9232.
---
.../java/org/apache/flink/connectors/hive/HiveTableFactory.java | 5 +++--
.../org/apache/flink/connectors/hive/HiveTableOutputFormat.java | 3 ++-
.../main/java/org/apache/flink/connectors/hive/HiveTableSink.java | 4 ++--
.../main/java/org/apache/flink/connectors/hive/HiveTableSource.java | 4 ++--
.../main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java | 3 +++
.../flink/table/catalog/hive/client/HiveMetastoreClientFactory.java | 6 +++---
.../java/org/apache/flink/connectors/hive/HiveInputFormatTest.java | 5 +++--
.../org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java | 3 ++-
8 files changed, 20 insertions(+), 13 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 4127bf3..73626d0 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -76,7 +75,9 @@ public class HiveTableFactory
public HiveTableFactory(HiveConf hiveConf) {
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be
null");
- this.hiveVersion = new
JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
+ // this has to come from hiveConf, otherwise we may lose what
user specifies in the yaml file
+ this.hiveVersion =
checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
}
@Override
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 52436cc..e4caac1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -163,7 +163,8 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
this.overwrite = overwrite;
isPartitioned = partitionColumns != null &&
!partitionColumns.isEmpty();
isDynamicPartition = isPartitioned && partitionColumns.size() >
hiveTablePartition.getPartitionSpec().size();
- hiveVersion =
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
+ hiveVersion =
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
}
// Custom serialization methods
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 7d8a85a..a36479d 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
@@ -76,7 +75,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row>
implements Partiti
this.jobConf = jobConf;
this.tablePath = tablePath;
this.catalogTable = table;
- hiveVersion =
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
+ hiveVersion =
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
TableSchema tableSchema = table.getSchema();
rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 2df10c5..a5a670e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
@@ -72,7 +71,8 @@ public class HiveTableSource extends
InputFormatTableSource<Row> implements Part
this.jobConf = Preconditions.checkNotNull(jobConf);
this.tablePath = Preconditions.checkNotNull(tablePath);
this.catalogTable = Preconditions.checkNotNull(catalogTable);
- this.hiveVersion =
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
+ this.hiveVersion =
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
initAllPartitions = false;
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7036196..ba0921c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,6 +53,7 @@ import
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -135,6 +136,8 @@ public class HiveCatalog extends AbstractCatalog {
this.hiveConf = hiveConf == null ? createHiveConf(null) :
hiveConf;
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion),
"hiveVersion cannot be null or empty");
this.hiveVersion = hiveVersion;
+ // add this to hiveConf to make sure table factory and
source/sink see the same Hive version as HiveCatalog
+ this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION,
hiveVersion);
LOG.info("Created HiveCatalog '{}'", catalogName);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
index ccdca1b..dc4c6da 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.catalog.hive.client;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,7 +31,7 @@ public class HiveMetastoreClientFactory {
}
public static HiveMetastoreClientWrapper create(HiveConf hiveConf,
String hiveVersion) {
- return new HiveMetastoreClientWrapper(hiveConf,
- StringUtils.isNullOrWhitespaceOnly(hiveVersion)
? HiveShimLoader.getHiveVersion() : hiveVersion);
+ Preconditions.checkNotNull(hiveVersion, "Hive version cannot be
null");
+ return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
index 19d6da2..b9a1527 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
@@ -92,7 +93,7 @@ public class HiveInputFormatTest {
);
//Now we used metaStore client to create hive table instead of
using hiveCatalog for it doesn't support set
//serDe temporarily.
- HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(hiveConf, null);
+ HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
org.apache.hadoop.hive.metastore.api.Table tbl = new
org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
@@ -142,7 +143,7 @@ public class HiveInputFormatTest {
//Now we used metaStore client to create hive table instead of
using hiveCatalog for it doesn't support set
//serDe temporarily.
- HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(hiveConf, null);
+ HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
org.apache.hadoop.hive.metastore.api.Table tbl = new
org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index aa02eb6..4ac76cd 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -56,7 +57,7 @@ public class TableEnvHiveConnectorTest {
HiveConf hiveConf = hiveShell.getHiveConf();
hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
hiveCatalog.open();
- hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+ hmsClient = HiveMetastoreClientFactory.create(hiveConf,
HiveShimLoader.getHiveVersion());
}
@Test