This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 26de0fa [FLINK-12892][table][hive] serialize catalog table to
properties for table discovery service
26de0fa is described below
commit 26de0faab65dfdc08d91393824944ca2fd43fb9b
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 18 11:57:27 2019 -0700
[FLINK-12892][table][hive] serialize catalog table to properties for table
discovery service
This PR enables serialization of catalog table to properties for table
discovery service and table factory to initiate source/sink from those
properties.
This closes #8784.
---
.../flink/table/catalog/hive/HiveCatalog.java | 5 +-
.../flink/table/catalog/hive/HiveCatalogTable.java | 7 --
.../flink/table/catalog/hive/HiveTableConfig.java | 2 -
.../flink/table/catalog/AbstractCatalogTable.java | 19 +++++
.../flink/table/catalog/GenericCatalogTable.java | 7 --
.../table/catalog/AbstractCatalogTableTest.java | 80 ++++++++++++++++++++++
.../table/catalog/config/CatalogTableConfig.java | 16 +++--
.../table/descriptors/DescriptorProperties.java | 30 ++++++++
8 files changed, 143 insertions(+), 23 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 0ee99df..3529f46 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.GenericCatalogTable;
import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -488,7 +489,7 @@ public class HiveCatalog extends AbstractCatalog {
if (isGeneric) {
properties = retrieveFlinkProperties(properties);
}
- String comment =
properties.remove(HiveTableConfig.TABLE_COMMENT);
+ String comment =
properties.remove(CatalogTableConfig.TABLE_COMMENT);
// Table schema
TableSchema tableSchema =
@@ -535,7 +536,7 @@ public class HiveCatalog extends AbstractCatalog {
Map<String, String> properties = new
HashMap<>(table.getProperties());
// Table comment
- properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
+ properties.put(CatalogTableConfig.TABLE_COMMENT,
table.getComment());
if (table instanceof GenericCatalogTable || table instanceof
GenericCatalogView) {
properties = maskFlinkProperties(properties);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
index ee3d14e..7ad4025 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -65,11 +65,4 @@ public class HiveCatalogTable extends AbstractCatalogTable {
return Optional.ofNullable(getComment());
}
- @Override
- public Map<String, String> toProperties() {
- // TODO: output properties that are used to auto-discover
TableFactory for Hive tables.
- Map<String, String> properties = new HashMap<>();
- return properties;
- }
-
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 273b4e9..2bb833a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -23,8 +23,6 @@ package org.apache.flink.table.catalog.hive;
*/
public class HiveTableConfig {
- // Comment of the Flink table
- public static final String TABLE_COMMENT = "comment";
public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index c65ca73..a00ce31 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -19,6 +19,9 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
import java.util.ArrayList;
import java.util.List;
@@ -82,4 +85,20 @@ public abstract class AbstractCatalogTable implements
CatalogTable {
return comment;
}
+ @Override
+ public Map<String, String> toProperties() {
+ DescriptorProperties descriptor = new DescriptorProperties();
+
+ descriptor.putTableSchema(Schema.SCHEMA, getSchema());
+
+ Map<String, String> properties = getProperties();
+ properties.remove(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY);
+
+
descriptor.putPropertiesWithPrefix(CatalogTableConfig.TABLE_PROPERTIES,
properties);
+
+ descriptor.putString(CatalogTableConfig.TABLE_COMMENT,
getComment());
+ descriptor.putString(CatalogTableConfig.TABLE_PARTITION_KEYS,
String.join(",", partitionKeys));
+
+ return descriptor.asMap();
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 0eff977..46b8794 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -54,13 +54,6 @@ public class GenericCatalogTable extends
AbstractCatalogTable {
}
@Override
- public Map<String, String> toProperties() {
- // TODO: Filter out ANY properties that are not needed for
table discovery.
- Map<String, String> properties = new HashMap<>();
- return properties;
- }
-
- @Override
public Optional<String> getDescription() {
return Optional.of(getComment());
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
new file mode 100644
index 0000000..f905d7b
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link AbstractCatalogTable}.
+ */
+public class AbstractCatalogTableTest {
+ private static final String TEST = "test";
+
+ @Test
+ public void testToProperties() {
+ TableSchema schema = createTableSchema();
+ Map<String, String> prop = createProperties();
+ GenericCatalogTable table = new GenericCatalogTable(
+ schema,
+ createPartitionKeys(),
+ prop,
+ TEST
+ );
+
+ DescriptorProperties descriptorProperties = new
DescriptorProperties();
+ descriptorProperties.putProperties(table.toProperties());
+
+ assertEquals(schema,
descriptorProperties.getTableSchema(Schema.SCHEMA));
+ assertEquals(TEST,
descriptorProperties.getString(CatalogTableConfig.TABLE_COMMENT));
+ assertEquals("second,third",
descriptorProperties.getString(CatalogTableConfig.TABLE_PARTITION_KEYS));
+ assertEquals(prop,
descriptorProperties.getPropertiesWithPrefix(CatalogTableConfig.TABLE_PROPERTIES));
+ }
+
+ private static Map<String, String> createProperties() {
+ return new HashMap<String, String>() {{
+ put("k", "v");
+ }};
+ }
+
+ private static TableSchema createTableSchema() {
+ return TableSchema.builder()
+ .field("first", DataTypes.STRING())
+ .field("second", DataTypes.INT())
+ .field("third", DataTypes.DOUBLE())
+ .build();
+ }
+
+ private static List<String> createPartitionKeys() {
+ return Arrays.asList("second", "third");
+ }
+
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
similarity index 67%
copy from
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
index 273b4e9..6d0c514 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
@@ -16,15 +16,21 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog.config;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
/**
- * Configs for tables in Hive metastore.
+ * Config for {@link CatalogBaseTable}.
*/
-public class HiveTableConfig {
+public class CatalogTableConfig {
- // Comment of the Flink table
+ // Comment of catalog table
public static final String TABLE_COMMENT = "comment";
- public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
+ // Partition keys of catalog table
+ public static final String TABLE_PARTITION_KEYS = "partition-keys";
+
+ // Prefix for properties of catalog table
+ public static final String TABLE_PROPERTIES = "properties";
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 7628912..446fb73 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -104,6 +104,21 @@ public class DescriptorProperties {
}
/**
+ * Adds a properties map by appending the given prefix to element keys
with a dot.
+ *
+ * <p>For example: for prefix "flink" and a map of a single property
with key "k" and value "v".
+ * The added property will be as key "flink.k" and value "v".
+ */
+ public void putPropertiesWithPrefix(String prefix, Map<String, String>
prop) {
+ checkNotNull(prefix);
+ checkNotNull(prop);
+
+ for (Map.Entry<String, String> e : prop.entrySet()) {
+ put(String.format("%s.%s", prefix, e.getKey()),
e.getValue());
+ }
+ }
+
+ /**
* Adds a class under the given key.
*/
public void putClass(String key, Class<?> clazz) {
@@ -705,6 +720,21 @@ public class DescriptorProperties {
return
optionalGet(key).orElseThrow(exceptionSupplier(key)).equals(value);
}
+ /**
+ * Returns a map of properties whose key starts with the given prefix,
+ * and the prefix is removed upon return.
+ *
+ * <p>For example, for prefix "flink" and a map of a single property
with key "flink.k" and value "v",
+ * this method will return it as key "k" and value "v" by identifying
and removing the prefix "flink".
+ */
+ public Map<String, String> getPropertiesWithPrefix(String prefix) {
+ String prefixWithDot = prefix + '.';
+
+ return properties.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(prefixWithDot))
+ .collect(Collectors.toMap(e ->
e.getKey().substring(prefix.length() + 1), Map.Entry::getValue));
+ }
+
//
--------------------------------------------------------------------------------------------
/**