This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26de0fa  [FLINK-12892][table][hive] serialize catalog table to 
properties for table discovery service
26de0fa is described below

commit 26de0faab65dfdc08d91393824944ca2fd43fb9b
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 18 11:57:27 2019 -0700

    [FLINK-12892][table][hive] serialize catalog table to properties for table 
discovery service
    
    This PR enables serialization of catalog table to properties for table 
discovery service and table factory to initiate source/sink from those 
properties.
    
    This closes #8784.
---
 .../flink/table/catalog/hive/HiveCatalog.java      |  5 +-
 .../flink/table/catalog/hive/HiveCatalogTable.java |  7 --
 .../flink/table/catalog/hive/HiveTableConfig.java  |  2 -
 .../flink/table/catalog/AbstractCatalogTable.java  | 19 +++++
 .../flink/table/catalog/GenericCatalogTable.java   |  7 --
 .../table/catalog/AbstractCatalogTableTest.java    | 80 ++++++++++++++++++++++
 .../table/catalog/config/CatalogTableConfig.java   | 16 +++--
 .../table/descriptors/DescriptorProperties.java    | 30 ++++++++
 8 files changed, 143 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 0ee99df..3529f46 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -488,7 +489,7 @@ public class HiveCatalog extends AbstractCatalog {
                if (isGeneric) {
                        properties = retrieveFlinkProperties(properties);
                }
-               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
+               String comment = 
properties.remove(CatalogTableConfig.TABLE_COMMENT);
 
                // Table schema
                TableSchema tableSchema =
@@ -535,7 +536,7 @@ public class HiveCatalog extends AbstractCatalog {
 
                Map<String, String> properties = new 
HashMap<>(table.getProperties());
                // Table comment
-               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
+               properties.put(CatalogTableConfig.TABLE_COMMENT, 
table.getComment());
                if (table instanceof GenericCatalogTable || table instanceof 
GenericCatalogView) {
                        properties = maskFlinkProperties(properties);
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
index ee3d14e..7ad4025 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -65,11 +65,4 @@ public class HiveCatalogTable extends AbstractCatalogTable {
                return Optional.ofNullable(getComment());
        }
 
-       @Override
-       public Map<String, String> toProperties() {
-               // TODO: output properties that are used to auto-discover 
TableFactory for Hive tables.
-               Map<String, String> properties = new HashMap<>();
-               return properties;
-       }
-
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 273b4e9..2bb833a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -23,8 +23,6 @@ package org.apache.flink.table.catalog.hive;
  */
 public class HiveTableConfig {
 
-       // Comment of the Flink table
-       public static final String TABLE_COMMENT = "comment";
        public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
 
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
index c65ca73..a00ce31 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/AbstractCatalogTable.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -82,4 +85,20 @@ public abstract class AbstractCatalogTable implements 
CatalogTable {
                return comment;
        }
 
+       @Override
+       public Map<String, String> toProperties() {
+               DescriptorProperties descriptor = new DescriptorProperties();
+
+               descriptor.putTableSchema(Schema.SCHEMA, getSchema());
+
+               Map<String, String> properties = getProperties();
+               properties.remove(GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY);
+
+               
descriptor.putPropertiesWithPrefix(CatalogTableConfig.TABLE_PROPERTIES, 
properties);
+
+               descriptor.putString(CatalogTableConfig.TABLE_COMMENT, 
getComment());
+               descriptor.putString(CatalogTableConfig.TABLE_PARTITION_KEYS, 
String.join(",", partitionKeys));
+
+               return descriptor.asMap();
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 0eff977..46b8794 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -54,13 +54,6 @@ public class GenericCatalogTable extends 
AbstractCatalogTable {
        }
 
        @Override
-       public Map<String, String> toProperties() {
-               // TODO: Filter out ANY properties that are not needed for 
table discovery.
-               Map<String, String> properties = new HashMap<>();
-               return properties;
-       }
-
-       @Override
        public Optional<String> getDescription() {
                return Optional.of(getComment());
        }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
new file mode 100644
index 0000000..f905d7b
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/AbstractCatalogTableTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogTableConfig;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link AbstractCatalogTable}.
+ */
+public class AbstractCatalogTableTest {
+       private static final String TEST = "test";
+
+       @Test
+       public void testToProperties() {
+               TableSchema schema = createTableSchema();
+               Map<String, String> prop = createProperties();
+               GenericCatalogTable table = new GenericCatalogTable(
+                       schema,
+                       createPartitionKeys(),
+                       prop,
+                       TEST
+               );
+
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties();
+               descriptorProperties.putProperties(table.toProperties());
+
+               assertEquals(schema, 
descriptorProperties.getTableSchema(Schema.SCHEMA));
+               assertEquals(TEST, 
descriptorProperties.getString(CatalogTableConfig.TABLE_COMMENT));
+               assertEquals("second,third", 
descriptorProperties.getString(CatalogTableConfig.TABLE_PARTITION_KEYS));
+               assertEquals(prop, 
descriptorProperties.getPropertiesWithPrefix(CatalogTableConfig.TABLE_PROPERTIES));
+       }
+
+       private static Map<String, String> createProperties() {
+               return new HashMap<String, String>() {{
+                       put("k", "v");
+               }};
+       }
+
+       private static TableSchema createTableSchema() {
+               return TableSchema.builder()
+                       .field("first", DataTypes.STRING())
+                       .field("second", DataTypes.INT())
+                       .field("third", DataTypes.DOUBLE())
+                       .build();
+       }
+
+       private static List<String> createPartitionKeys() {
+               return Arrays.asList("second", "third");
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
similarity index 67%
copy from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
index 273b4e9..6d0c514 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogTableConfig.java
@@ -16,15 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog.config;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
 
 /**
- * Configs for tables in Hive metastore.
+ * Config for {@link CatalogBaseTable}.
  */
-public class HiveTableConfig {
+public class CatalogTableConfig {
 
-       // Comment of the Flink table
+       // Comment of catalog table
        public static final String TABLE_COMMENT = "comment";
-       public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";
 
+       // Partition keys of catalog table
+       public static final String TABLE_PARTITION_KEYS = "partition-keys";
+
+       // Prefix for properties of catalog table
+       public static final String TABLE_PROPERTIES = "properties";
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index 7628912..446fb73 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -104,6 +104,21 @@ public class DescriptorProperties {
        }
 
        /**
+        * Adds a properties map by appending the given prefix to element keys 
with a dot.
+        *
+        * <p>For example: for prefix "flink" and a map of a single property 
with key "k" and value "v".
+        * The added property will be as key "flink.k" and value "v".
+        */
+       public void putPropertiesWithPrefix(String prefix, Map<String, String> 
prop) {
+               checkNotNull(prefix);
+               checkNotNull(prop);
+
+               for (Map.Entry<String, String> e : prop.entrySet()) {
+                       put(String.format("%s.%s", prefix, e.getKey()), 
e.getValue());
+               }
+       }
+
+       /**
         * Adds a class under the given key.
         */
        public void putClass(String key, Class<?> clazz) {
@@ -705,6 +720,21 @@ public class DescriptorProperties {
                return 
optionalGet(key).orElseThrow(exceptionSupplier(key)).equals(value);
        }
 
+       /**
+        * Returns a map of properties whose key starts with the given prefix,
+        * and the prefix is removed upon return.
+        *
+        * <p>For example, for prefix "flink" and a map of a single property 
with key "flink.k" and value "v",
+        * this method will return it as key "k" and value "v" by identifying 
and removing the prefix "flink".
+        */
+       public Map<String, String> getPropertiesWithPrefix(String prefix) {
+               String prefixWithDot = prefix + '.';
+
+               return properties.entrySet().stream()
+                       .filter(e -> e.getKey().startsWith(prefixWithDot))
+                       .collect(Collectors.toMap(e -> 
e.getKey().substring(prefix.length() + 1), Map.Entry::getValue));
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**

Reply via email to