This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8b68ca7 [FLINK-13005][hive] HiveCatalog should not add
'flink.is_generic' key for Hive table
8b68ca7 is described below
commit 8b68ca769967f6c2035ceba6ebc25fe6b9988250
Author: bowen.li <[email protected]>
AuthorDate: Thu Jun 27 10:43:31 2019 -0700
[FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for
Hive table
This PR fixes a bug that 'flink.is_generic' key should not add for metadata
for a non-generic catalog table (a.k.a a Hive table).
This closes #8904.
---
.../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++---------
.../flink/table/catalog/config/CatalogConfig.java | 6 ++++++
.../apache/flink/table/catalog/CatalogTestUtil.java | 3 +++
3 files changed, 20 insertions(+), 9 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 237b105..8659a80 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.batch.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
@@ -93,6 +94,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -107,11 +109,6 @@ public class HiveCatalog extends AbstractCatalog {
private static final StorageFormatFactory storageFormatFactory = new
StorageFormatFactory();
private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT =
"TextFile";
- // Prefix used to distinguish properties created by Hive and Flink,
- // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
- private static final String FLINK_PROPERTY_PREFIX = "flink.";
- private static final String FLINK_PROPERTY_IS_GENERIC =
FLINK_PROPERTY_PREFIX + CatalogConfig.IS_GENERIC;
-
// Prefix used to distinguish Flink functions from Hive functions.
// It's appended to Flink function's class name
// because Hive's Function object doesn't have properties or other
place to store the flag for Flink functions.
@@ -482,7 +479,7 @@ public class HiveCatalog extends AbstractCatalog {
// Table properties
Map<String, String> properties = hiveTable.getParameters();
- boolean isGeneric =
Boolean.valueOf(properties.computeIfAbsent(FLINK_PROPERTY_IS_GENERIC, k ->
String.valueOf(false)));
+ boolean isGeneric =
Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
if (isGeneric) {
properties = retrieveFlinkProperties(properties);
}
@@ -580,20 +577,25 @@ public class HiveCatalog extends AbstractCatalog {
/**
* Filter out Hive-created properties, and return Flink-created
properties.
+ * Note that 'is_generic' is a special key and this method will leave
it as-is.
*/
private static Map<String, String> retrieveFlinkProperties(Map<String,
String> hiveTableParams) {
return hiveTableParams.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+ .filter(e ->
e.getKey().startsWith(FLINK_PROPERTY_PREFIX) ||
e.getKey().equals(CatalogConfig.IS_GENERIC))
.collect(Collectors.toMap(e ->
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
}
/**
* Add a prefix to Flink-created properties to distinguish them from
Hive-created properties.
+ * Note that 'is_generic' is a special key and this method will leave
it as-is.
*/
private static Map<String, String> maskFlinkProperties(Map<String,
String> properties) {
return properties.entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
- .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX +
e.getKey(), e -> e.getValue()));
+ .map(e -> new Tuple2<>(
+ e.getKey().equals(CatalogConfig.IS_GENERIC) ?
e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+ e.getValue()))
+ .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
// ------ partitions ------
@@ -769,7 +771,7 @@ public class HiveCatalog extends AbstractCatalog {
// make sure both table and partition are generic, or neither is
private static void ensureTableAndPartitionMatch(Table hiveTable,
CatalogPartition catalogPartition) {
- boolean isGeneric =
Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+ boolean isGeneric =
Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
if ((isGeneric && catalogPartition instanceof
HiveCatalogPartition) ||
(!isGeneric && catalogPartition instanceof
GenericCatalogPartition)) {
throw new CatalogException(String.format("Cannot handle
%s partition for %s table",
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 05afa32..7a4a624 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -24,4 +24,10 @@ package org.apache.flink.table.catalog.config;
public class CatalogConfig {
public static final String IS_GENERIC = "is_generic";
+
+ // Globally reserved prefix for catalog properties.
+ // User defined properties should not with this prefix.
+ // Used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
+ public static final String FLINK_PROPERTY_PREFIX = "flink.";
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 905a44c..1c64025 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.plan.stats.TableStats;
import java.util.Map;
+import static
org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -57,6 +58,7 @@ public class CatalogTestUtil {
if
(Boolean.valueOf(t1.getProperties().get(CatalogConfig.IS_GENERIC))) {
assertEquals(t1.getProperties(), t2.getProperties());
} else {
+
assertTrue(t2.getProperties().keySet().stream().noneMatch(k ->
k.startsWith(FLINK_PROPERTY_PREFIX)));
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
}
}
@@ -73,6 +75,7 @@ public class CatalogTestUtil {
if
(Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
assertEquals(v1.getProperties(), v2.getProperties());
} else {
+
assertTrue(v2.getProperties().keySet().stream().noneMatch(k ->
k.startsWith(FLINK_PROPERTY_PREFIX)));
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
}
}