This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab79f6  [FLINK-12965][table][hive] unify catalog view implementations
1ab79f6 is described below

commit 1ab79f65fabdb47d7d041557132b4978db4bc9e2
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 14:13:21 2019 -0700

    [FLINK-12965][table][hive] unify catalog view implementations
    
    This PR unified implementations of CatalogView.
    
    This closes #8882.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 28 ++++-------
 .../hive/HiveCatalogGenericMetadataTest.java       | 23 ---------
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 33 -------------
 flink-python/pyflink/table/catalog.py              | 22 +--------
 flink-python/pyflink/table/tests/test_catalog.py   |  4 +-
 .../flink/table/catalog/CatalogViewImpl.java       | 28 +++++------
 .../flink/table/catalog/GenericCatalogView.java    | 55 ----------------------
 .../flink/table/catalog/CatalogTestBase.java       | 20 ++++++++
 .../table/catalog/GenericInMemoryCatalogTest.java  | 54 ---------------------
 .../apache/flink/table/catalog/CatalogTest.java    | 22 +++------
 .../flink/table/catalog/CatalogTestUtil.java       | 16 +++++++
 11 files changed, 70 insertions(+), 235 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 2cf86f5..1a1a312 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -22,17 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.batch.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.AbstractCatalogView;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.CatalogViewImpl;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
 import org.apache.flink.table.catalog.GenericCatalogPartition;
-import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.config.CatalogTableConfig;
@@ -498,23 +499,12 @@ public class HiveCatalog extends AbstractCatalog {
                }
 
                if (isView) {
-                       if (isGeneric) {
-                               return new GenericCatalogView(
+                       return new CatalogViewImpl(
                                        hiveTable.getViewOriginalText(),
                                        hiveTable.getViewExpandedText(),
                                        tableSchema,
                                        properties,
-                                       comment
-                               );
-                       } else {
-                               return new HiveCatalogView(
-                                       hiveTable.getViewOriginalText(),
-                                       hiveTable.getViewExpandedText(),
-                                       tableSchema,
-                                       properties,
-                                       comment
-                               );
-                       }
+                                       comment);
                } else {
                        return new CatalogTableImpl(tableSchema, partitionKeys, 
properties, comment);
                }
@@ -546,7 +536,7 @@ public class HiveCatalog extends AbstractCatalog {
 
                // Table columns and partition keys
                if (table instanceof CatalogTableImpl) {
-                       CatalogTableImpl catalogTable = (CatalogTableImpl) 
table;
+                       CatalogTable catalogTable = (CatalogTableImpl) table;
 
                        if (catalogTable.isPartitioned()) {
                                int partitionKeySize = 
catalogTable.getPartitionKeys().size();
@@ -559,8 +549,8 @@ public class HiveCatalog extends AbstractCatalog {
                                sd.setCols(allColumns);
                                hiveTable.setPartitionKeys(new ArrayList<>());
                        }
-               } else if (table instanceof AbstractCatalogView) {
-                       AbstractCatalogView view = (AbstractCatalogView) table;
+               } else if (table instanceof CatalogViewImpl) {
+                       CatalogView view = (CatalogViewImpl) table;
 
                        // TODO: [FLINK-12398] Support partitioned view in 
catalog API
                        sd.setCols(allColumns);
@@ -571,7 +561,7 @@ public class HiveCatalog extends AbstractCatalog {
                        hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
                } else {
                        throw new CatalogException(
-                               "HiveCatalog only supports CatalogTable and 
HiveCatalogView");
+                               "HiveCatalog only supports CatalogTableImpl and 
CatalogViewImpl");
                }
 
                return hiveTable;
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index d5c2a9b..d517a95 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,9 +25,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
-import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -39,7 +37,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 
@@ -279,26 +276,6 @@ public class HiveCatalogGenericMetadataTest extends 
CatalogTestBase {
        }
 
        @Override
-       public CatalogView createView() {
-               return new GenericCatalogView(
-                       String.format("select * from %s", t1),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
-                       createTableSchema(),
-                       new HashMap<>(),
-                       "This is a view");
-       }
-
-       @Override
-       public CatalogView createAnotherView() {
-               return new GenericCatalogView(
-                       String.format("select * from %s", t2),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
-                       createAnotherTableSchema(),
-                       new HashMap<>(),
-                       "This is another view");
-       }
-
-       @Override
        protected CatalogFunction createFunction() {
                return new 
GenericCatalogFunction(MyScalarFunction.class.getName());
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 2f6f6f8..efe7fec 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -141,26 +140,6 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
        }
 
        @Override
-       public CatalogView createView() {
-               return new HiveCatalogView(
-                       String.format("select * from %s", t1),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
-                       createTableSchema(),
-                       new HashMap<>(),
-                       "This is a hive view");
-       }
-
-       @Override
-       public CatalogView createAnotherView() {
-               return new HiveCatalogView(
-                       String.format("select * from %s", t2),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
-                       createAnotherTableSchema(),
-                       new HashMap<>(),
-                       "This is another hive view");
-       }
-
-       @Override
        protected CatalogFunction createFunction() {
                return new HiveCatalogFunction("test.class.name");
        }
@@ -176,18 +155,6 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
        }
 
        @Override
-       protected void checkEquals(CatalogView v1, CatalogView v2) {
-               assertEquals(v1.getSchema(), v1.getSchema());
-               assertEquals(v1.getComment(), v2.getComment());
-               assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
-               assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
-
-               // Hive views may have properties created by itself
-               // thus properties of Hive view is a super set of those in its 
corresponding Flink view
-               
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
-       }
-
-       @Override
        protected void checkEquals(CatalogPartition expected, CatalogPartition 
actual) {
                assertTrue(expected instanceof HiveCatalogPartition && actual 
instanceof HiveCatalogPartition);
                assertEquals(expected.getClass(), actual.getClass());
diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index f84f1a5..69252db 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -24,7 +24,7 @@ from pyflink.table.table_schema import TableSchema
 __all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 
'CatalogPartition', 'CatalogFunction',
            'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
            'CatalogColumnStatistics', 'HiveCatalog', 'HiveCatalogFunction',
-           'HiveCatalogPartition', 'HiveCatalogView']
+           'HiveCatalogPartition']
 
 
 class Catalog(object):
@@ -626,11 +626,7 @@ class CatalogBaseTable(object):
 
     @staticmethod
     def _get(j_catalog_base_table):
-        if j_catalog_base_table.getClass().getName() == \
-                "org.apache.flink.table.catalog.hive.HiveCatalogView":
-            return HiveCatalogView(j_hive_catalog_view=j_catalog_base_table)
-        else:
-            return CatalogBaseTable(j_catalog_base_table)
+        return CatalogBaseTable(j_catalog_base_table)
 
     def get_properties(self):
         """
@@ -1011,17 +1007,3 @@ class HiveCatalogPartition(CatalogPartition):
 
     def get_location(self):
         return self._j_catalog_partition.getLocation()
-
-
-class HiveCatalogView(CatalogBaseTable):
-    """
-    A Hive catalog view implementation.
-    """
-
-    def __init__(self, original_query=None, expanded_query=None, 
table_schema=None,
-                 properties=None, comment=None, j_hive_catalog_view=None):
-        gateway = get_gateway()
-        if j_hive_catalog_view is None:
-            j_hive_catalog_view = 
gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogView(
-                original_query, expanded_query, table_schema._j_table_schema, 
properties, comment)
-        super(HiveCatalogView, self).__init__(j_hive_catalog_view)
diff --git a/flink-python/pyflink/table/tests/test_catalog.py 
b/flink-python/pyflink/table/tests/test_catalog.py
index a372bd4..3f5bf12 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -155,7 +155,7 @@ class CatalogTestBase(PyFlinkTestCase):
     def create_view():
         gateway = get_gateway()
         table_schema = CatalogTestBase.create_table_schema()
-        j_view = gateway.jvm.GenericCatalogView(
+        j_view = gateway.jvm.CatalogViewImpl(
             "select * from t1",
             "select * from test-catalog.db1.t1",
             table_schema._j_table_schema,
@@ -167,7 +167,7 @@ class CatalogTestBase(PyFlinkTestCase):
     def create_another_view():
         gateway = get_gateway()
         table_schema = CatalogTestBase.create_another_table_schema()
-        j_view = gateway.jvm.GenericCatalogView(
+        j_view = gateway.jvm.CatalogViewImpl(
             "select * from t2",
             "select * from test-catalog.db2.t2",
             table_schema._j_table_schema,
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
similarity index 66%
rename from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
index 927e6c7..3d9e039 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
@@ -16,34 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.AbstractCatalogView;
-import org.apache.flink.table.catalog.CatalogBaseTable;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 /**
- * A Hive catalog view implementation.
+ * An implementation of catalog view.
  */
-public class HiveCatalogView extends AbstractCatalogView {
-
-       public HiveCatalogView(
+public class CatalogViewImpl extends AbstractCatalogView {
+       public CatalogViewImpl(
                        String originalQuery,
                        String expandedQuery,
-                       TableSchema tableSchema,
+                       TableSchema schema,
                        Map<String, String> properties,
                        String comment) {
-               super(originalQuery, expandedQuery, tableSchema, properties, 
comment);
+               super(originalQuery, expandedQuery, schema, properties, 
comment);
        }
 
        @Override
        public CatalogBaseTable copy() {
-               return new HiveCatalogView(
-                       this.getOriginalQuery(), this.getExpandedQuery(), 
this.getSchema().copy(), new HashMap<>(this.getProperties()), getComment());
+               return new CatalogViewImpl(
+                       getOriginalQuery(),
+                       getExpandedQuery(),
+                       getSchema().copy(),
+                       new HashMap<>(getProperties()),
+                       getComment()
+               );
        }
 
        @Override
@@ -53,8 +55,6 @@ public class HiveCatalogView extends AbstractCatalogView {
 
        @Override
        public Optional<String> getDetailedDescription() {
-               // TODO: return a detailed description
-               return Optional.ofNullable(getComment());
+               return Optional.of("This is a catalog view implementation");
        }
-
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
deleted file mode 100644
index aff8499..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.config.CatalogConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A generic catalog view implementation.
- */
-public class GenericCatalogView extends AbstractCatalogView {
-
-       public GenericCatalogView(String originalQuery, String expandedQuery, 
TableSchema schema,
-               Map<String, String> properties, String comment) {
-               super(originalQuery, expandedQuery, schema, properties, 
comment);
-               properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
-       }
-
-       @Override
-       public GenericCatalogView copy() {
-               return new GenericCatalogView(getOriginalQuery(), 
getExpandedQuery(), getSchema().copy(),
-                       new HashMap<>(getProperties()), getComment());
-       }
-
-       @Override
-       public Optional<String> getDescription() {
-               return Optional.of(getComment());
-       }
-
-       @Override
-       public Optional<String> getDetailedDescription() {
-               return Optional.of("This is a catalog view in an im-memory 
catalog");
-       }
-
-}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 558926e..7779f97 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -95,6 +95,26 @@ public abstract class CatalogTestBase extends CatalogTest {
                        TEST_COMMENT);
        }
 
+       @Override
+       public CatalogView createView() {
+               return new CatalogViewImpl(
+                       String.format("select * from %s", t1),
+                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
+                       createTableSchema(),
+                       getBatchTableProperties(),
+                       "This is a view");
+       }
+
+       @Override
+       public CatalogView createAnotherView() {
+               return new CatalogViewImpl(
+                       String.format("select * from %s", t2),
+                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
+                       createAnotherTableSchema(),
+                       getBatchTableProperties(),
+                       "This is another view");
+       }
+
        protected Map<String, String> getBatchTableProperties() {
                return new HashMap<String, String>() {{
                        put(IS_STREAMING, "false");
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d6ce1a5..950115c 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -166,64 +166,10 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
        }
 
        @Override
-       public CatalogTable createTable() {
-               return new CatalogTableImpl(
-                       createTableSchema(),
-                       getBatchTableProperties(),
-                       TEST_COMMENT);
-       }
-
-       @Override
-       public CatalogTable createAnotherTable() {
-               return new CatalogTableImpl(
-                       createAnotherTableSchema(),
-                       getBatchTableProperties(),
-                       TEST_COMMENT);
-       }
-
-       @Override
-       public CatalogTable createPartitionedTable() {
-               return new CatalogTableImpl(
-                       createTableSchema(),
-                       createPartitionKeys(),
-                       getBatchTableProperties(),
-                       TEST_COMMENT);
-       }
-
-       @Override
-       public CatalogTable createAnotherPartitionedTable() {
-               return new CatalogTableImpl(
-                       createAnotherTableSchema(),
-                       createPartitionKeys(),
-                       getBatchTableProperties(),
-                       TEST_COMMENT);
-       }
-
-       @Override
        public CatalogPartition createPartition() {
                return new GenericCatalogPartition(getBatchTableProperties(), 
"Generic batch table");
        }
 
-       @Override
-       public CatalogView createView() {
-               return new GenericCatalogView(
-                       String.format("select * from %s", t1),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
-                       createTableSchema(),
-                       new HashMap<>(),
-                       "This is a view");
-       }
-
-       @Override
-       public CatalogView createAnotherView() {
-               return new GenericCatalogView(
-                       String.format("select * from %s", t2),
-                       String.format("select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
-                       createAnotherTableSchema(),
-                       new HashMap<>(),
-                       "This is another view");
-       }
-
        private CatalogColumnStatistics createColumnStats() {
                CatalogColumnStatisticsDataBoolean booleanColStats = new 
CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
                CatalogColumnStatisticsDataLong longColStats = new 
CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 9348d35..88c8568 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -377,13 +377,13 @@ public abstract class CatalogTest {
                CatalogView view = createView();
                catalog.createTable(path3, view, false);
 
-               checkEquals(view, (CatalogView) catalog.getTable(path3));
+               CatalogTestUtil.checkEquals(view, (CatalogView) 
catalog.getTable(path3));
 
                CatalogView newView = createAnotherView();
                catalog.alterTable(path3, newView, false);
 
                assertNotEquals(view, catalog.getTable(path3));
-               checkEquals(newView, (CatalogView) catalog.getTable(path3));
+               CatalogTestUtil.checkEquals(newView, (CatalogView) 
catalog.getTable(path3));
        }
 
        @Test
@@ -493,7 +493,7 @@ public abstract class CatalogTest {
                catalog.createTable(path1, view, false);
 
                assertTrue(catalog.getTable(path1) instanceof CatalogView);
-               checkEquals(view, (CatalogView) catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(view, (CatalogView) 
catalog.getTable(path1));
        }
 
        @Test
@@ -523,12 +523,12 @@ public abstract class CatalogTest {
                catalog.createTable(path1, view, false);
 
                assertTrue(catalog.getTable(path1) instanceof CatalogView);
-               checkEquals(view, (CatalogView) catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(view, (CatalogView) 
catalog.getTable(path1));
 
                catalog.createTable(path1, createAnotherView(), true);
 
                assertTrue(catalog.getTable(path1) instanceof CatalogView);
-               checkEquals(view, (CatalogView) catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(view, (CatalogView) 
catalog.getTable(path1));
        }
 
        @Test
@@ -550,13 +550,13 @@ public abstract class CatalogTest {
                CatalogView view = createView();
                catalog.createTable(path1, view, false);
 
-               checkEquals(view, (CatalogView) catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(view, (CatalogView) 
catalog.getTable(path1));
 
                CatalogView newView = createAnotherView();
                catalog.alterTable(path1, newView, false);
 
                assertTrue(catalog.getTable(path1) instanceof CatalogView);
-               checkEquals(newView, (CatalogView) catalog.getTable(path1));
+               CatalogTestUtil.checkEquals(newView, (CatalogView) 
catalog.getTable(path1));
        }
 
        @Test
@@ -1325,14 +1325,6 @@ public abstract class CatalogTest {
        // ------ equality check utils ------
        // Can be overriden by sub test class
 
-       protected void checkEquals(CatalogView v1, CatalogView v2) {
-               assertEquals(v1.getSchema(), v1.getSchema());
-               assertEquals(v1.getProperties(), v2.getProperties());
-               assertEquals(v1.getComment(), v2.getComment());
-               assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
-               assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
-       }
-
        protected void checkEquals(CatalogFunction f1, CatalogFunction f2) {
                assertEquals(f1.getClassName(), f2.getClassName());
                assertEquals(f1.getProperties(), f2.getProperties());
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index e79002f..905a44c 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -61,6 +61,22 @@ public class CatalogTestUtil {
                }
        }
 
+       public static void checkEquals(CatalogView v1, CatalogView v2) {
+               assertEquals(v1.getClass(), v2.getClass());
+               assertEquals(v1.getSchema(), v1.getSchema());
+               assertEquals(v1.getComment(), v2.getComment());
+               assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+               assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+
+               // Hive tables may have properties created by itself
+               // thus properties of Hive table is a super set of those in its 
corresponding Flink table
+               if 
(Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
+                       assertEquals(v1.getProperties(), v2.getProperties());
+               } else {
+                       
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
+               }
+       }
+
        public static void checkEquals(TableStats ts1, TableStats ts2) {
                assertEquals(ts1.getRowCount(), ts2.getRowCount());
                assertEquals(ts1.getColumnStats().size(), 
ts2.getColumnStats().size());

Reply via email to