This is an automated email from the ASF dual-hosted git repository.
lirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0b68682 [FLINK-22884][hive] HiveCatalog should mark views as generic
and store schema in properties
0b68682 is described below
commit 0b686824c33ad976cefb588c5a272a09de4b23ca
Author: Rui Li <[email protected]>
AuthorDate: Fri Jun 11 21:25:46 2021 +0800
[FLINK-22884][hive] HiveCatalog should mark views as generic and store
schema in properties
This closes #16149
---
.../flink/table/catalog/hive/HiveCatalog.java | 1 -
.../table/catalog/hive/util/HiveTableUtil.java | 20 +++++-
.../delegation/hive/DDLOperationConverter.java | 1 -
.../flink/connectors/hive/HiveDialectITCase.java | 18 +++---
.../hive/HiveCatalogGenericMetadataTest.java | 19 ++++++
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 73 ++++++++++++++++++++++
.../table/catalog/hive/HiveCatalogITCase.java | 58 +++++++++++++++++
.../flink/table/catalog/CatalogTestBase.java | 4 +-
8 files changed, 180 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index e6bb77e..cf84056 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -705,7 +705,6 @@ public class HiveCatalog extends AbstractCatalog {
Table table = client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
boolean isHiveTable;
if
(table.getParameters().containsKey(CatalogPropertiesUtil.IS_GENERIC)) {
- // check is_generic to be backward compatible
isHiveTable =
!Boolean.parseBoolean(
table.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC));
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 2e38220..d421194 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -78,6 +78,9 @@ import static
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableS
import static
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL;
import static
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI;
import static
org.apache.flink.table.catalog.CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX;
+import static org.apache.flink.table.catalog.CatalogPropertiesUtil.IS_GENERIC;
+import static
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Utils to for Hive-backed table. */
@@ -340,6 +343,7 @@ public class HiveTableUtil {
public static Table instantiateHiveTable(
ObjectPath tablePath, CatalogBaseTable table, HiveConf hiveConf) {
+ final boolean isView = table instanceof CatalogView;
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
@@ -358,7 +362,10 @@ public class HiveTableUtil {
StorageDescriptor sd = hiveTable.getSd();
HiveTableUtil.setDefaultStorageFormat(sd, hiveConf);
- if (isHiveTable) {
+ // We always store schema as properties for view, because view schema
may not be mapped to
+ // hive schema. This also means views created by flink cannot be used
in hive, which is fine
+ // because hive cannot understand the expanded query anyway
+ if (isHiveTable && !isView) {
HiveTableUtil.initiateTableFromProperties(hiveTable, properties,
hiveConf);
List<FieldSchema> allColumns =
HiveTableUtil.createHiveColumns(table.getSchema());
// Table columns and partition keys
@@ -394,10 +401,19 @@ public class HiveTableUtil {
properties.putAll(tableSchemaProps.asMap());
properties = maskFlinkProperties(properties);
+ // we may need to explicitly set is_generic flag in the following
cases:
+ // 1. user doesn't specify 'connector' or 'connector.type' when
creating a table, w/o
+ // 'is_generic', such a table will be considered as a hive table
upon retrieval
+ // 2. when creating views which don't have connector properties
+ if (isView
+ || (!properties.containsKey(FLINK_PROPERTY_PREFIX +
CONNECTOR.key())
+ && !properties.containsKey(FLINK_PROPERTY_PREFIX +
CONNECTOR_TYPE))) {
+ properties.put(IS_GENERIC, "true");
+ }
hiveTable.setParameters(properties);
}
- if (table instanceof CatalogView) {
+ if (isView) {
// TODO: [FLINK-12398] Support partitioned view in catalog API
hiveTable.setPartitionKeys(new ArrayList<>());
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
index e06d994..7bef1d7 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java
@@ -309,7 +309,6 @@ public class DDLOperationConverter {
props.putAll(baseTable.getOptions());
comment = baseTable.getComment();
} else {
- markHiveConnector(props);
comment = desc.getComment();
if (desc.getTblProps() != null) {
props.putAll(desc.getTblProps());
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 43d1099..1dbc40f 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -559,20 +561,20 @@ public class HiveDialectITCase {
tableEnv.executeSql(
"create view v(vx) comment 'v comment' tblproperties
('k1'='v1') as select x from tbl");
ObjectPath viewPath = new ObjectPath("default", "v");
- Table hiveView = hiveCatalog.getHiveTable(viewPath);
- assertEquals(TableType.VIRTUAL_VIEW.name(), hiveView.getTableType());
- assertEquals("vx", hiveView.getSd().getCols().get(0).getName());
- assertEquals("v1", hiveView.getParameters().get("k1"));
+ CatalogBaseTable catalogBaseTable = hiveCatalog.getTable(viewPath);
+ assertTrue(catalogBaseTable instanceof CatalogView);
+ assertEquals("vx",
catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());
+ assertEquals("v1", catalogBaseTable.getOptions().get("k1"));
// change properties
tableEnv.executeSql("alter view v set tblproperties ('k1'='v11')");
- hiveView = hiveCatalog.getHiveTable(viewPath);
- assertEquals("v11", hiveView.getParameters().get("k1"));
+ catalogBaseTable = hiveCatalog.getTable(viewPath);
+ assertEquals("v11", catalogBaseTable.getOptions().get("k1"));
// change query
tableEnv.executeSql("alter view v as select y from tbl");
- hiveView = hiveCatalog.getHiveTable(viewPath);
- assertEquals("y", hiveView.getSd().getCols().get(0).getName());
+ catalogBaseTable = hiveCatalog.getTable(viewPath);
+ assertEquals("y",
catalogBaseTable.getUnresolvedSchema().getColumns().get(0).getName());
// rename
tableEnv.executeSql("alter view v rename to v1");
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 4aac265..b3948af 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
@@ -40,6 +41,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -317,6 +319,23 @@ public class HiveCatalogGenericMetadataTest extends
HiveCatalogMetadataTestBase
assertEquals(FunctionLanguage.JAVA,
catalogFunction.getFunctionLanguage());
}
+ @Test
+ public void testGenericTableWithoutConnectorProp() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .fields(
+ new String[] {"s", "ts"},
+ new DataType[] {DataTypes.STRING(),
DataTypes.TIMESTAMP_LTZ(3)})
+ .watermark("ts", "ts-INTERVAL '1' SECOND",
DataTypes.TIMESTAMP_LTZ(3))
+ .build();
+ CatalogTable catalogTable = new CatalogTableImpl(tableSchema,
Collections.emptyMap(), null);
+ catalog.createTable(path1, catalogTable, false);
+ CatalogTable retrievedTable = (CatalogTable) catalog.getTable(path1);
+ assertEquals(tableSchema, retrievedTable.getSchema());
+ assertEquals(Collections.emptyMap(), retrievedTable.getOptions());
+ }
+
// ------ functions ------
@Test
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index b8ee7be..66bbcdd 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -22,16 +22,21 @@ import
org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -43,10 +48,13 @@ import
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.udf.UDFRand;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
@@ -54,10 +62,14 @@ import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static
org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -94,6 +106,67 @@ public class HiveCatalogHiveMetadataTest extends
HiveCatalogMetadataTestBase {
}
// ------ table and column stats ------
+
+ @Test
+ public void testViewCompatibility() throws Exception {
+ // we always store view schema via properties now
+ // make sure non-generic views created previously can still be used
+ catalog.createDatabase(db1, createDb(), false);
+ Table hiveView =
+ org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
+ path1.getDatabaseName(), path1.getObjectName());
+ // mark as a view
+ hiveView.setTableType(TableType.VIRTUAL_VIEW.name());
+ final String originQuery = "view origin query";
+ final String expandedQuery = "view expanded query";
+ hiveView.setViewOriginalText(originQuery);
+ hiveView.setViewExpandedText(expandedQuery);
+ // set schema in SD
+ Schema schema =
+ Schema.newBuilder()
+ .fromFields(
+ new String[] {"i", "s"},
+ new AbstractDataType[] {DataTypes.INT(),
DataTypes.STRING()})
+ .build();
+ List<FieldSchema> fields = new ArrayList<>();
+ for (Schema.UnresolvedColumn column : schema.getColumns()) {
+ String name = column.getName();
+ DataType type = (DataType) ((Schema.UnresolvedPhysicalColumn)
column).getDataType();
+ fields.add(
+ new FieldSchema(
+ name, HiveTypeUtil.toHiveTypeInfo(type,
true).getTypeName(), null));
+ }
+ hiveView.getSd().setCols(fields);
+ // test mark as non-generic with is_generic
+ hiveView.getParameters().put(CatalogPropertiesUtil.IS_GENERIC,
"false");
+ // add some other properties
+ hiveView.getParameters().put("k1", "v1");
+
+ ((HiveCatalog) catalog).client.createTable(hiveView);
+ CatalogBaseTable baseTable = catalog.getTable(path1);
+ assertTrue(baseTable instanceof CatalogView);
+ CatalogView catalogView = (CatalogView) baseTable;
+ assertEquals(schema, catalogView.getUnresolvedSchema());
+ assertEquals(originQuery, catalogView.getOriginalQuery());
+ assertEquals(expandedQuery, catalogView.getExpandedQuery());
+ assertEquals("v1", catalogView.getOptions().get("k1"));
+
+ // test mark as non-generic with connector
+ hiveView.setDbName(path3.getDatabaseName());
+ hiveView.setTableName(path3.getObjectName());
+ hiveView.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC);
+ hiveView.getParameters().put(CONNECTOR.key(), IDENTIFIER);
+
+ ((HiveCatalog) catalog).client.createTable(hiveView);
+ baseTable = catalog.getTable(path3);
+ assertTrue(baseTable instanceof CatalogView);
+ catalogView = (CatalogView) baseTable;
+ assertEquals(schema, catalogView.getUnresolvedSchema());
+ assertEquals(originQuery, catalogView.getOriginalQuery());
+ assertEquals(expandedQuery, catalogView.getExpandedQuery());
+ assertEquals("v1", catalogView.getOptions().get("k1"));
+ }
+
@Test
public void testAlterTableColumnStatistics() throws Exception {
String hiveVersion = ((HiveCatalog) catalog).getHiveVersion();
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 3e4e32a..221c636 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
@@ -31,6 +33,7 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.descriptors.FileSystem;
@@ -38,6 +41,7 @@ import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.factories.FactoryUtil;
import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
@@ -497,4 +501,58 @@ public class HiveCatalogITCase {
assertEquals("x", catalogTable.getSchema().getFieldNames()[0]);
assertEquals(DataTypes.INT(),
catalogTable.getSchema().getFieldDataTypes()[0]);
}
+
+ @Test
+ public void testViewSchema() throws Exception {
+ TableEnvironment tableEnv =
+
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.DEFAULT);
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+
+ tableEnv.executeSql("create database db1");
+ try {
+ tableEnv.useDatabase("db1");
+ tableEnv.executeSql(
+ "create table src(x int,ts timestamp(3)) with
('connector'='datagen','number-of-rows'='10')");
+ tableEnv.executeSql("create view v1 as select x,ts from src order
by x limit 3");
+
+ CatalogView catalogView =
+ (CatalogView) hiveCatalog.getTable(new ObjectPath("db1",
"v1"));
+ Schema viewSchema = catalogView.getUnresolvedSchema();
+ assertEquals(
+ Schema.newBuilder()
+ .fromFields(
+ new String[] {"x", "ts"},
+ new AbstractDataType[] {
+ DataTypes.INT(), DataTypes.TIMESTAMP(3)
+ })
+ .build(),
+ viewSchema);
+
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select x from v1").collect());
+ assertEquals(3, results.size());
+
+ tableEnv.executeSql(
+ "create view v2 (v2_x,v2_ts) comment 'v2 comment' as
select x,cast(ts as timestamp_ltz(3)) from v1");
+ catalogView = (CatalogView) hiveCatalog.getTable(new
ObjectPath("db1", "v2"));
+ assertEquals(
+ Schema.newBuilder()
+ .fromFields(
+ new String[] {"v2_x", "v2_ts"},
+ new AbstractDataType[] {
+ DataTypes.INT(),
DataTypes.TIMESTAMP_LTZ(3)
+ })
+ .build(),
+ catalogView.getUnresolvedSchema());
+ assertEquals("v2 comment", catalogView.getComment());
+ results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from v2").collect());
+ assertEquals(3, results.size());
+ } finally {
+ tableEnv.executeSql("drop database db1 cascade");
+ }
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index a588343..a24efd5 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -125,7 +125,7 @@ public abstract class CatalogTestBase extends CatalogTest {
String.format("select * from %s", t1),
String.format(
"select * from %s.%s", TEST_CATALOG_NAME,
path1.getFullName()),
- getBatchTableProperties());
+ Collections.emptyMap());
return new ResolvedCatalogView(origin, resolvedSchema);
}
@@ -139,7 +139,7 @@ public abstract class CatalogTestBase extends CatalogTest {
String.format("select * from %s", t2),
String.format(
"select * from %s.%s", TEST_CATALOG_NAME,
path2.getFullName()),
- getBatchTableProperties());
+ Collections.emptyMap());
return new ResolvedCatalogView(origin, resolvedSchema);
}