This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab79f6 [FLINK-12965][table][hive] unify catalog view implementations
1ab79f6 is described below
commit 1ab79f65fabdb47d7d041557132b4978db4bc9e2
Author: bowen.li <[email protected]>
AuthorDate: Tue Jun 25 14:13:21 2019 -0700
[FLINK-12965][table][hive] unify catalog view implementations
This PR unified implementations of CatalogView.
This closes #8882.
---
.../flink/table/catalog/hive/HiveCatalog.java | 28 ++++-------
.../hive/HiveCatalogGenericMetadataTest.java | 23 ---------
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 33 -------------
flink-python/pyflink/table/catalog.py | 22 +--------
flink-python/pyflink/table/tests/test_catalog.py | 4 +-
.../flink/table/catalog/CatalogViewImpl.java | 28 +++++------
.../flink/table/catalog/GenericCatalogView.java | 55 ----------------------
.../flink/table/catalog/CatalogTestBase.java | 20 ++++++++
.../table/catalog/GenericInMemoryCatalogTest.java | 54 ---------------------
.../apache/flink/table/catalog/CatalogTest.java | 22 +++------
.../flink/table/catalog/CatalogTestUtil.java | 16 +++++++
11 files changed, 70 insertions(+), 235 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 2cf86f5..1a1a312 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -22,17 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.batch.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.AbstractCatalogView;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.GenericCatalogFunction;
import org.apache.flink.table.catalog.GenericCatalogPartition;
-import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.config.CatalogTableConfig;
@@ -498,23 +499,12 @@ public class HiveCatalog extends AbstractCatalog {
}
if (isView) {
- if (isGeneric) {
- return new GenericCatalogView(
+ return new CatalogViewImpl(
hiveTable.getViewOriginalText(),
hiveTable.getViewExpandedText(),
tableSchema,
properties,
- comment
- );
- } else {
- return new HiveCatalogView(
- hiveTable.getViewOriginalText(),
- hiveTable.getViewExpandedText(),
- tableSchema,
- properties,
- comment
- );
- }
+ comment);
} else {
return new CatalogTableImpl(tableSchema, partitionKeys,
properties, comment);
}
@@ -546,7 +536,7 @@ public class HiveCatalog extends AbstractCatalog {
// Table columns and partition keys
if (table instanceof CatalogTableImpl) {
- CatalogTableImpl catalogTable = (CatalogTableImpl)
table;
+ CatalogTable catalogTable = (CatalogTableImpl) table;
if (catalogTable.isPartitioned()) {
int partitionKeySize =
catalogTable.getPartitionKeys().size();
@@ -559,8 +549,8 @@ public class HiveCatalog extends AbstractCatalog {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
- } else if (table instanceof AbstractCatalogView) {
- AbstractCatalogView view = (AbstractCatalogView) table;
+ } else if (table instanceof CatalogViewImpl) {
+ CatalogView view = (CatalogViewImpl) table;
// TODO: [FLINK-12398] Support partitioned view in
catalog API
sd.setCols(allColumns);
@@ -571,7 +561,7 @@ public class HiveCatalog extends AbstractCatalog {
hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
} else {
throw new CatalogException(
- "HiveCatalog only supports CatalogTable and
HiveCatalogView");
+ "HiveCatalog only supports CatalogTableImpl and
CatalogViewImpl");
}
return hiveTable;
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index d5c2a9b..d517a95 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,9 +25,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogFunction;
-import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
@@ -39,7 +37,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
-import java.util.HashMap;
import static org.junit.Assert.assertEquals;
@@ -279,26 +276,6 @@ public class HiveCatalogGenericMetadataTest extends
CatalogTestBase {
}
@Override
- public CatalogView createView() {
- return new GenericCatalogView(
- String.format("select * from %s", t1),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
- createTableSchema(),
- new HashMap<>(),
- "This is a view");
- }
-
- @Override
- public CatalogView createAnotherView() {
- return new GenericCatalogView(
- String.format("select * from %s", t2),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
- createAnotherTableSchema(),
- new HashMap<>(),
- "This is another view");
- }
-
- @Override
protected CatalogFunction createFunction() {
return new
GenericCatalogFunction(MyScalarFunction.class.getName());
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 2f6f6f8..efe7fec 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -141,26 +140,6 @@ public class HiveCatalogHiveMetadataTest extends
CatalogTestBase {
}
@Override
- public CatalogView createView() {
- return new HiveCatalogView(
- String.format("select * from %s", t1),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
- createTableSchema(),
- new HashMap<>(),
- "This is a hive view");
- }
-
- @Override
- public CatalogView createAnotherView() {
- return new HiveCatalogView(
- String.format("select * from %s", t2),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
- createAnotherTableSchema(),
- new HashMap<>(),
- "This is another hive view");
- }
-
- @Override
protected CatalogFunction createFunction() {
return new HiveCatalogFunction("test.class.name");
}
@@ -176,18 +155,6 @@ public class HiveCatalogHiveMetadataTest extends
CatalogTestBase {
}
@Override
- protected void checkEquals(CatalogView v1, CatalogView v2) {
- assertEquals(v1.getSchema(), v1.getSchema());
- assertEquals(v1.getComment(), v2.getComment());
- assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
- assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
-
- // Hive views may have properties created by itself
- // thus properties of Hive view is a super set of those in its
corresponding Flink view
-
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
- }
-
- @Override
protected void checkEquals(CatalogPartition expected, CatalogPartition
actual) {
assertTrue(expected instanceof HiveCatalogPartition && actual
instanceof HiveCatalogPartition);
assertEquals(expected.getClass(), actual.getClass());
diff --git a/flink-python/pyflink/table/catalog.py
b/flink-python/pyflink/table/catalog.py
index f84f1a5..69252db 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -24,7 +24,7 @@ from pyflink.table.table_schema import TableSchema
__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable',
'CatalogPartition', 'CatalogFunction',
'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
'CatalogColumnStatistics', 'HiveCatalog', 'HiveCatalogFunction',
- 'HiveCatalogPartition', 'HiveCatalogView']
+ 'HiveCatalogPartition']
class Catalog(object):
@@ -626,11 +626,7 @@ class CatalogBaseTable(object):
@staticmethod
def _get(j_catalog_base_table):
- if j_catalog_base_table.getClass().getName() == \
- "org.apache.flink.table.catalog.hive.HiveCatalogView":
- return HiveCatalogView(j_hive_catalog_view=j_catalog_base_table)
- else:
- return CatalogBaseTable(j_catalog_base_table)
+ return CatalogBaseTable(j_catalog_base_table)
def get_properties(self):
"""
@@ -1011,17 +1007,3 @@ class HiveCatalogPartition(CatalogPartition):
def get_location(self):
return self._j_catalog_partition.getLocation()
-
-
-class HiveCatalogView(CatalogBaseTable):
- """
- A Hive catalog view implementation.
- """
-
- def __init__(self, original_query=None, expanded_query=None,
table_schema=None,
- properties=None, comment=None, j_hive_catalog_view=None):
- gateway = get_gateway()
- if j_hive_catalog_view is None:
- j_hive_catalog_view =
gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalogView(
- original_query, expanded_query, table_schema._j_table_schema,
properties, comment)
- super(HiveCatalogView, self).__init__(j_hive_catalog_view)
diff --git a/flink-python/pyflink/table/tests/test_catalog.py
b/flink-python/pyflink/table/tests/test_catalog.py
index a372bd4..3f5bf12 100644
--- a/flink-python/pyflink/table/tests/test_catalog.py
+++ b/flink-python/pyflink/table/tests/test_catalog.py
@@ -155,7 +155,7 @@ class CatalogTestBase(PyFlinkTestCase):
def create_view():
gateway = get_gateway()
table_schema = CatalogTestBase.create_table_schema()
- j_view = gateway.jvm.GenericCatalogView(
+ j_view = gateway.jvm.CatalogViewImpl(
"select * from t1",
"select * from test-catalog.db1.t1",
table_schema._j_table_schema,
@@ -167,7 +167,7 @@ class CatalogTestBase(PyFlinkTestCase):
def create_another_view():
gateway = get_gateway()
table_schema = CatalogTestBase.create_another_table_schema()
- j_view = gateway.jvm.GenericCatalogView(
+ j_view = gateway.jvm.CatalogViewImpl(
"select * from t2",
"select * from test-catalog.db2.t2",
table_schema._j_table_schema,
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
similarity index 66%
rename from
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
rename to
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
index 927e6c7..3d9e039 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogView.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogViewImpl.java
@@ -16,34 +16,36 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.AbstractCatalogView;
-import org.apache.flink.table.catalog.CatalogBaseTable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
- * A Hive catalog view implementation.
+ * An implementation of catalog view.
*/
-public class HiveCatalogView extends AbstractCatalogView {
-
- public HiveCatalogView(
+public class CatalogViewImpl extends AbstractCatalogView {
+ public CatalogViewImpl(
String originalQuery,
String expandedQuery,
- TableSchema tableSchema,
+ TableSchema schema,
Map<String, String> properties,
String comment) {
- super(originalQuery, expandedQuery, tableSchema, properties,
comment);
+ super(originalQuery, expandedQuery, schema, properties,
comment);
}
@Override
public CatalogBaseTable copy() {
- return new HiveCatalogView(
- this.getOriginalQuery(), this.getExpandedQuery(),
this.getSchema().copy(), new HashMap<>(this.getProperties()), getComment());
+ return new CatalogViewImpl(
+ getOriginalQuery(),
+ getExpandedQuery(),
+ getSchema().copy(),
+ new HashMap<>(getProperties()),
+ getComment()
+ );
}
@Override
@@ -53,8 +55,6 @@ public class HiveCatalogView extends AbstractCatalogView {
@Override
public Optional<String> getDetailedDescription() {
- // TODO: return a detailed description
- return Optional.ofNullable(getComment());
+ return Optional.of("This is a catalog view implementation");
}
-
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
deleted file mode 100644
index aff8499..0000000
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog;
-
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.config.CatalogConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * A generic catalog view implementation.
- */
-public class GenericCatalogView extends AbstractCatalogView {
-
- public GenericCatalogView(String originalQuery, String expandedQuery,
TableSchema schema,
- Map<String, String> properties, String comment) {
- super(originalQuery, expandedQuery, schema, properties,
comment);
- properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
- }
-
- @Override
- public GenericCatalogView copy() {
- return new GenericCatalogView(getOriginalQuery(),
getExpandedQuery(), getSchema().copy(),
- new HashMap<>(getProperties()), getComment());
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.of(getComment());
- }
-
- @Override
- public Optional<String> getDetailedDescription() {
- return Optional.of("This is a catalog view in an im-memory
catalog");
- }
-
-}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 558926e..7779f97 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -95,6 +95,26 @@ public abstract class CatalogTestBase extends CatalogTest {
TEST_COMMENT);
}
+ @Override
+ public CatalogView createView() {
+ return new CatalogViewImpl(
+ String.format("select * from %s", t1),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
+ createTableSchema(),
+ getBatchTableProperties(),
+ "This is a view");
+ }
+
+ @Override
+ public CatalogView createAnotherView() {
+ return new CatalogViewImpl(
+ String.format("select * from %s", t2),
+ String.format("select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
+ createAnotherTableSchema(),
+ getBatchTableProperties(),
+ "This is another view");
+ }
+
protected Map<String, String> getBatchTableProperties() {
return new HashMap<String, String>() {{
put(IS_STREAMING, "false");
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d6ce1a5..950115c 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -166,64 +166,10 @@ public class GenericInMemoryCatalogTest extends
CatalogTestBase {
}
@Override
- public CatalogTable createTable() {
- return new CatalogTableImpl(
- createTableSchema(),
- getBatchTableProperties(),
- TEST_COMMENT);
- }
-
- @Override
- public CatalogTable createAnotherTable() {
- return new CatalogTableImpl(
- createAnotherTableSchema(),
- getBatchTableProperties(),
- TEST_COMMENT);
- }
-
- @Override
- public CatalogTable createPartitionedTable() {
- return new CatalogTableImpl(
- createTableSchema(),
- createPartitionKeys(),
- getBatchTableProperties(),
- TEST_COMMENT);
- }
-
- @Override
- public CatalogTable createAnotherPartitionedTable() {
- return new CatalogTableImpl(
- createAnotherTableSchema(),
- createPartitionKeys(),
- getBatchTableProperties(),
- TEST_COMMENT);
- }
-
- @Override
public CatalogPartition createPartition() {
return new GenericCatalogPartition(getBatchTableProperties(),
"Generic batch table");
}
- @Override
- public CatalogView createView() {
- return new GenericCatalogView(
- String.format("select * from %s", t1),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
- createTableSchema(),
- new HashMap<>(),
- "This is a view");
- }
-
- @Override
- public CatalogView createAnotherView() {
- return new GenericCatalogView(
- String.format("select * from %s", t2),
- String.format("select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
- createAnotherTableSchema(),
- new HashMap<>(),
- "This is another view");
- }
-
private CatalogColumnStatistics createColumnStats() {
CatalogColumnStatisticsDataBoolean booleanColStats = new
CatalogColumnStatisticsDataBoolean(55L, 45L, 5L);
CatalogColumnStatisticsDataLong longColStats = new
CatalogColumnStatisticsDataLong(-123L, 763322L, 23L, 79L);
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 9348d35..88c8568 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -377,13 +377,13 @@ public abstract class CatalogTest {
CatalogView view = createView();
catalog.createTable(path3, view, false);
- checkEquals(view, (CatalogView) catalog.getTable(path3));
+ CatalogTestUtil.checkEquals(view, (CatalogView)
catalog.getTable(path3));
CatalogView newView = createAnotherView();
catalog.alterTable(path3, newView, false);
assertNotEquals(view, catalog.getTable(path3));
- checkEquals(newView, (CatalogView) catalog.getTable(path3));
+ CatalogTestUtil.checkEquals(newView, (CatalogView)
catalog.getTable(path3));
}
@Test
@@ -493,7 +493,7 @@ public abstract class CatalogTest {
catalog.createTable(path1, view, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- checkEquals(view, (CatalogView) catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(view, (CatalogView)
catalog.getTable(path1));
}
@Test
@@ -523,12 +523,12 @@ public abstract class CatalogTest {
catalog.createTable(path1, view, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- checkEquals(view, (CatalogView) catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(view, (CatalogView)
catalog.getTable(path1));
catalog.createTable(path1, createAnotherView(), true);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- checkEquals(view, (CatalogView) catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(view, (CatalogView)
catalog.getTable(path1));
}
@Test
@@ -550,13 +550,13 @@ public abstract class CatalogTest {
CatalogView view = createView();
catalog.createTable(path1, view, false);
- checkEquals(view, (CatalogView) catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(view, (CatalogView)
catalog.getTable(path1));
CatalogView newView = createAnotherView();
catalog.alterTable(path1, newView, false);
assertTrue(catalog.getTable(path1) instanceof CatalogView);
- checkEquals(newView, (CatalogView) catalog.getTable(path1));
+ CatalogTestUtil.checkEquals(newView, (CatalogView)
catalog.getTable(path1));
}
@Test
@@ -1325,14 +1325,6 @@ public abstract class CatalogTest {
// ------ equality check utils ------
// Can be overriden by sub test class
- protected void checkEquals(CatalogView v1, CatalogView v2) {
- assertEquals(v1.getSchema(), v1.getSchema());
- assertEquals(v1.getProperties(), v2.getProperties());
- assertEquals(v1.getComment(), v2.getComment());
- assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
- assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
- }
-
protected void checkEquals(CatalogFunction f1, CatalogFunction f2) {
assertEquals(f1.getClassName(), f2.getClassName());
assertEquals(f1.getProperties(), f2.getProperties());
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index e79002f..905a44c 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -61,6 +61,22 @@ public class CatalogTestUtil {
}
}
+ public static void checkEquals(CatalogView v1, CatalogView v2) {
+ assertEquals(v1.getClass(), v2.getClass());
+ assertEquals(v1.getSchema(), v1.getSchema());
+ assertEquals(v1.getComment(), v2.getComment());
+ assertEquals(v1.getOriginalQuery(), v2.getOriginalQuery());
+ assertEquals(v1.getExpandedQuery(), v2.getExpandedQuery());
+
+ // Hive tables may have properties created by itself
+ // thus properties of Hive table is a super set of those in its
corresponding Flink table
+ if
(Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
+ assertEquals(v1.getProperties(), v2.getProperties());
+ } else {
+
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
+ }
+ }
+
public static void checkEquals(TableStats ts1, TableStats ts2) {
assertEquals(ts1.getRowCount(), ts2.getRowCount());
assertEquals(ts1.getColumnStats().size(),
ts2.getColumnStats().size());