This is an automated email from the ASF dual-hosted git repository.

lirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b68682  [FLINK-22884][hive] HiveCatalog should mark views as generic 
and store schema in properties
0b68682 is described below

commit 0b686824c33ad976cefb588c5a272a09de4b23ca
Author: Rui Li <[email protected]>
AuthorDate: Fri Jun 11 21:25:46 2021 +0800

    [FLINK-22884][hive] HiveCatalog should mark views as generic and store 
schema in properties
    
    This closes #16149
---
 .../flink/table/catalog/hive/HiveCatalog.java      |  1 -
 .../table/catalog/hive/util/HiveTableUtil.java     | 20 +++++-
 .../delegation/hive/DDLOperationConverter.java     |  1 -
 .../flink/connectors/hive/HiveDialectITCase.java   | 18 +++---
 .../hive/HiveCatalogGenericMetadataTest.java       | 19 ++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 73 ++++++++++++++++++++++
 .../table/catalog/hive/HiveCatalogITCase.java      | 58 +++++++++++++++++
 .../flink/table/catalog/CatalogTestBase.java       |  4 +-
 8 files changed, 180 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index e6bb77e..cf84056 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -705,7 +705,6 @@ public class HiveCatalog extends AbstractCatalog {
             Table table = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
             boolean isHiveTable;
             if 
(table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
-                // check is_generic to be backward compatible
                 isHiveTable =
                         !Boolean.parseBoolean(
                                 
table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 2e38220..d421194 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -78,6 +78,9 @@ import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableS
 import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
 import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
 import static 
org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX;
+import static org.apache.flink.table.catalog.CatalogPropertiesUtil.IS_GENERIC;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Utils to for Hive-backed table. */
@@ -340,6 +343,7 @@ public class HiveTableUtil {
 
     public static Table instantiateHiveTable(
             ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
+        final boolean isView = table instanceof CatalogView;
         // let Hive set default parameters for us, e.g. serialization.format
         Table hiveTable =
                 org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
@@ -358,7 +362,10 @@ public class HiveTableUtil {
         StorageDescriptor sd = hiveTable.getSd();
         HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
 
-        if (isHiveTable) {
+        // We always store schema as properties for view, because view schema 
may not be mapped to
+        // hive schema. This also means views created by flink cannot be used 
in hive, which is fine
+        // because hive cannot understand the expanded query anyway
+        if (isHiveTable && !isView) {
             HiveTableUtil.initiateTableFromProperties(hiveTable, properties, 
hiveConf);
             List<FieldSchema> allColumns = 
HiveTableUtil.createHiveColumns(table.getSchema());
             // Table columns and partition keys
@@ -394,10 +401,19 @@ public class HiveTableUtil {
 
             properties.putAll(tableSchemaProps.asMap());
             properties = maskFlinkProperties(properties);
+            // we may need to explicitly set is_generic flag in the following 
cases:
+            // 1. user doesn't specify 'connector' or 'connector.type' when 
creating a table, w/o
+            // 'is_generic', such a table will be considered as a hive table 
upon retrieval
+            // 2. when creating views which don't have connector properties
+            if (isView
+                    || (!properties.containsKey(FLINK_PROPERTY_PREFIX + 
CONNECTOR.key())
+                            && !properties.containsKey(FLINK_PROPERTY_PREFIX + 
CONNECTOR_TYPE))) {
+                properties.put(IS_GENERIC, "true");
+            }
             hiveTable.setParameters(properties);
         }
 
-        if (table instanceof CatalogView) {
+        if (isView) {
             // TODO: [FLINK-12398] Support partitioned view in catalog API
             hiveTable.setPartitionKeys(new ArrayList<>());
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
index e06d994..7bef1d7 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
@@ -309,7 +309,6 @@ public class DDLOperationConverter {
             props.putAll(baseTable.getOptions());
             comment = baseTable.getComment();
         } else {
-            markHiveConnector(props);
             comment = desc.getComment();
             if (desc.getTblProps() != null) {
                 props.putAll(desc.getTblProps());
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 43d1099..1dbc40f 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -559,20 +561,20 @@ public class HiveDialectITCase {
         tableEnv.executeSql(
                 "create view v(vx) comment 'v comment' tblproperties 
('k1'='v1') as select x from tbl");
         ObjectPath viewPath = new ObjectPath("default", "v");
-        Table hiveView = hiveCatalog.getHiveTable(viewPath);
-        assertEquals(TableType.VIRTUAL_VIEW.name(), hiveView.getTableType());
-        assertEquals("vx", hiveView.getSd().getCols().get(0).getName());
-        assertEquals("v1", hiveView.getParameters().get("k1"));
+        CatalogBaseTable catalogBaseTable = hiveCatalog.getTable(viewPath);
+        assertTrue(catalogBaseTable instanceof CatalogView);
+        assertEquals("vx", 
catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());
+        assertEquals("v1", catalogBaseTable.getOptions().get("k1"));
 
         // change properties
         tableEnv.executeSql("alter view v set tblproperties ('k1'='v11')");
-        hiveView = hiveCatalog.getHiveTable(viewPath);
-        assertEquals("v11", hiveView.getParameters().get("k1"));
+        catalogBaseTable = hiveCatalog.getTable(viewPath);
+        assertEquals("v11", catalogBaseTable.getOptions().get("k1"));
 
         // change query
         tableEnv.executeSql("alter view v as select y from tbl");
-        hiveView = hiveCatalog.getHiveTable(viewPath);
-        assertEquals("y", hiveView.getSd().getCols().get(0).getName());
+        catalogBaseTable = hiveCatalog.getTable(viewPath);
+        assertEquals("y", 
catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());
 
         // rename
         tableEnv.executeSql("alter view v rename to v1");
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 4aac265..b3948af 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPropertiesUtil;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -40,6 +41,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -317,6 +319,23 @@ public class HiveCatalogGenericMetadataTest extends 
HiveCatalogMetadataTestBase
         assertEquals(FunctionLanguage.JAVA, 
catalogFunction.getFunctionLanguage());
     }
 
+    @Test
+    public void testGenericTableWithoutConnectorProp() throws Exception {
+        catalog.createDatabase(db1, createDb(), false);
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .fields(
+                                new String[] {"s", "ts"},
+                                new DataType[] {DataTypes.STRING(), 
DataTypes.TIMESTAMP_LTZ(3)})
+                        .watermark("ts", "ts-INTERVAL '1' SECOND", 
DataTypes.TIMESTAMP_LTZ(3))
+                        .build();
+        CatalogTable catalogTable = new CatalogTableImpl(tableSchema, 
Collections.emptyMap(), null);
+        catalog.createTable(path1, catalogTable, false);
+        CatalogTable retrievedTable = (CatalogTable) catalog.getTable(path1);
+        assertEquals(tableSchema, retrievedTable.getSchema());
+        assertEquals(Collections.emptyMap(), retrievedTable.getOptions());
+    }
+
     // ------ functions ------
 
     @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index b8ee7be..66bbcdd 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -22,16 +22,21 @@ import 
org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogPropertiesUtil;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -43,10 +48,13 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.catalog.stats.Date;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.udf.UDFRand;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
@@ -54,10 +62,14 @@ import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -94,6 +106,67 @@ public class HiveCatalogHiveMetadataTest extends 
HiveCatalogMetadataTestBase {
     }
 
     // ------ table and column stats ------
+
+    @Test
+    public void testViewCompatibility() throws Exception {
+        // we always store view schema via properties now
+        // make sure non-generic views created previously can still be used
+        catalog.createDatabase(db1, createDb(), false);
+        Table hiveView =
+                org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
+                        path1.getDatabaseName(), path1.getObjectName());
+        // mark as a view
+        hiveView.setTableType(TableType.VIRTUAL_VIEW.name());
+        final String originQuery = "view origin query";
+        final String expandedQuery = "view expanded query";
+        hiveView.setViewOriginalText(originQuery);
+        hiveView.setViewExpandedText(expandedQuery);
+        // set schema in SD
+        Schema schema =
+                Schema.newBuilder()
+                        .fromFields(
+                                new String[] {"i", "s"},
+                                new AbstractDataType[] {DataTypes.INT(), 
DataTypes.STRING()})
+                        .build();
+        List<FieldSchema> fields = new ArrayList<>();
+        for (Schema.UnresolvedColumn column : schema.getColumns()) {
+            String name = column.getName();
+            DataType type = (DataType) ((Schema.UnresolvedPhysicalColumn) 
column).getDataType();
+            fields.add(
+                    new FieldSchema(
+                            name, HiveTypeUtil.toHiveTypeInfo(type, 
true).getTypeName(), null));
+        }
+        hiveView.getSd().setCols(fields);
+        // test mark as non-generic with is_generic
+        hiveView.getParameters().put(CatalogPropertiesUtil.IS_GENERIC, 
"false");
+        // add some other properties
+        hiveView.getParameters().put("k1", "v1");
+
+        ((HiveCatalog) catalog).client.createTable(hiveView);
+        CatalogBaseTable baseTable = catalog.getTable(path1);
+        assertTrue(baseTable instanceof CatalogView);
+        CatalogView catalogView = (CatalogView) baseTable;
+        assertEquals(schema, catalogView.getUnresolvedSchema());
+        assertEquals(originQuery, catalogView.getOriginalQuery());
+        assertEquals(expandedQuery, catalogView.getExpandedQuery());
+        assertEquals("v1", catalogView.getOptions().get("k1"));
+
+        // test mark as non-generic with connector
+        hiveView.setDbName(path3.getDatabaseName());
+        hiveView.setTableName(path3.getObjectName());
+        hiveView.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC);
+        hiveView.getParameters().put(CONNECTOR.key(), IDENTIFIER);
+
+        ((HiveCatalog) catalog).client.createTable(hiveView);
+        baseTable = catalog.getTable(path3);
+        assertTrue(baseTable instanceof CatalogView);
+        catalogView = (CatalogView) baseTable;
+        assertEquals(schema, catalogView.getUnresolvedSchema());
+        assertEquals(originQuery, catalogView.getOriginalQuery());
+        assertEquals(expandedQuery, catalogView.getExpandedQuery());
+        assertEquals("v1", catalogView.getOptions().get("k1"));
+    }
+
     @Test
     public void testAlterTableColumnStatistics() throws Exception {
         String hiveVersion = ((HiveCatalog) catalog).getHiveVersion();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 3e4e32a..221c636 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
@@ -31,6 +33,7 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.descriptors.FileSystem;
@@ -38,6 +41,7 @@ import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.OldCsv;
 import org.apache.flink.table.factories.FactoryUtil;
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FileUtils;
@@ -497,4 +501,58 @@ public class HiveCatalogITCase {
         assertEquals("x", catalogTable.getSchema().getFieldNames()[0]);
         assertEquals(DataTypes.INT(), 
catalogTable.getSchema().getFieldDataTypes()[0]);
     }
+
+    @Test
+    public void testViewSchema() throws Exception {
+        TableEnvironment tableEnv =
+                
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+
+        tableEnv.executeSql("create database db1");
+        try {
+            tableEnv.useDatabase("db1");
+            tableEnv.executeSql(
+                    "create table src(x int,ts timestamp(3)) with 
('connector'='datagen','number-of-rows'='10')");
+            tableEnv.executeSql("create view v1 as select x,ts from src order 
by x limit 3");
+
+            CatalogView catalogView =
+                    (CatalogView) hiveCatalog.getTable(new ObjectPath("db1", 
"v1"));
+            Schema viewSchema = catalogView.getUnresolvedSchema();
+            assertEquals(
+                    Schema.newBuilder()
+                            .fromFields(
+                                    new String[] {"x", "ts"},
+                                    new AbstractDataType[] {
+                                        DataTypes.INT(), DataTypes.TIMESTAMP(3)
+                                    })
+                            .build(),
+                    viewSchema);
+
+            List<Row> results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select x from v1").collect());
+            assertEquals(3, results.size());
+
+            tableEnv.executeSql(
+                    "create view v2 (v2_x,v2_ts) comment 'v2 comment' as 
select x,cast(ts as timestamp_ltz(3)) from v1");
+            catalogView = (CatalogView) hiveCatalog.getTable(new 
ObjectPath("db1", "v2"));
+            assertEquals(
+                    Schema.newBuilder()
+                            .fromFields(
+                                    new String[] {"v2_x", "v2_ts"},
+                                    new AbstractDataType[] {
+                                        DataTypes.INT(), 
DataTypes.TIMESTAMP_LTZ(3)
+                                    })
+                            .build(),
+                    catalogView.getUnresolvedSchema());
+            assertEquals("v2 comment", catalogView.getComment());
+            results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from v2").collect());
+            assertEquals(3, results.size());
+        } finally {
+            tableEnv.executeSql("drop database db1 cascade");
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index a588343..a24efd5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -125,7 +125,7 @@ public abstract class CatalogTestBase extends CatalogTest {
                         String.format("select * from %s", t1),
                         String.format(
                                 "select * from %s.%s", TEST_CATALOG_NAME, 
path1.getFullName()),
-                        getBatchTableProperties());
+                        Collections.emptyMap());
         return new ResolvedCatalogView(origin, resolvedSchema);
     }
 
@@ -139,7 +139,7 @@ public abstract class CatalogTestBase extends CatalogTest {
                         String.format("select * from %s", t2),
                         String.format(
                                 "select * from %s.%s", TEST_CATALOG_NAME, 
path2.getFullName()),
-                        getBatchTableProperties());
+                        Collections.emptyMap());
         return new ResolvedCatalogView(origin, resolvedSchema);
     }
 

Reply via email to