This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new cc6258c [FLINK-18378] Improve CatalogTable schema resolution
cc6258c is described below
commit cc6258c2876f76b68d9516508325af03a3e39c3e
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jun 19 15:53:08 2020 +0200
[FLINK-18378] Improve CatalogTable schema resolution
This closes #12725
---
.../api/internal/CatalogTableSchemaResolver.java | 35 ++--
.../table/api/internal/TableEnvironmentImpl.java | 7 +-
.../apache/flink/table/catalog/CatalogManager.java | 42 ++--
.../flink/table/catalog/CatalogTableImpl.java | 8 -
.../apache/flink/table/catalog/CatalogTest.java | 2 +-
.../table/planner/catalog/CatalogSchemaTable.java | 61 +++---
.../planner/catalog/DatabaseCalciteSchema.java | 5 +-
.../planner/catalog/JavaCatalogTableTest.java | 211 +++++++++++++++++++++
.../plan/FlinkCalciteCatalogReaderTest.java | 9 +-
.../table/planner/catalog/JavaCatalogTableTest.xml | 153 +++++++++++++++
.../flink/table/catalog/DatabaseCalciteSchema.java | 56 ++++--
.../catalog/QueryOperationCatalogViewTable.java | 9 +-
.../flink/table/api/internal/TableEnvImpl.scala | 2 +-
13 files changed, 490 insertions(+), 110 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
index 3e1e08a..e4d011f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
@@ -27,8 +27,10 @@ import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.TypeConversions;
/**
@@ -76,17 +78,17 @@ public class CatalogTableSchemaResolver {
for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
TableColumn tableColumn =
tableSchema.getTableColumns().get(i);
DataType fieldType = fieldTypes[i];
- if (tableColumn.isGenerated() &&
isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
- if (fieldNames[i].equals(rowtime)) {
- throw new TableException("Watermark can
not be defined for a processing time attribute column.");
+
+ if (tableColumn.isGenerated()) {
+ fieldType =
resolveExpressionDataType(tableColumn.getExpr().get(), tableSchema);
+ if (isProctime(fieldType)) {
+ if (fieldNames[i].equals(rowtime)) {
+ throw new
TableException("Watermark can not be defined for a processing time attribute
column.");
+ }
}
- TimestampType originalType = (TimestampType)
fieldType.getLogicalType();
- LogicalType proctimeType = new TimestampType(
- originalType.isNullable(),
- TimestampKind.PROCTIME,
- originalType.getPrecision());
- fieldType =
TypeConversions.fromLogicalToDataType(proctimeType);
- } else if (isStreamingMode &&
fieldNames[i].equals(rowtime)) {
+ }
+
+ if (isStreamingMode && fieldNames[i].equals(rowtime)) {
TimestampType originalType = (TimestampType)
fieldType.getLogicalType();
LogicalType rowtimeType = new TimestampType(
originalType.isNullable(),
@@ -94,6 +96,7 @@ public class CatalogTableSchemaResolver {
originalType.getPrecision());
fieldType =
TypeConversions.fromLogicalToDataType(rowtimeType);
}
+
if (tableColumn.isGenerated()) {
builder.field(fieldNames[i], fieldType,
tableColumn.getExpr().get());
} else {
@@ -107,12 +110,16 @@ public class CatalogTableSchemaResolver {
return builder.build();
}
- private boolean isProctimeType(String expr, TableSchema tableSchema) {
+ private boolean isProctime(DataType exprType) {
+ return LogicalTypeChecks.hasFamily(exprType.getLogicalType(),
LogicalTypeFamily.TIMESTAMP) &&
+
LogicalTypeChecks.isProctimeAttribute(exprType.getLogicalType());
+ }
+
+ private DataType resolveExpressionDataType(String expr, TableSchema
tableSchema) {
ResolvedExpression resolvedExpr =
parser.parseSqlExpression(expr, tableSchema);
if (resolvedExpr == null) {
- return false;
+ throw new ValidationException("Could not resolve field
expression: " + expr);
}
- LogicalType type =
resolvedExpr.getOutputDataType().getLogicalType();
- return type instanceof TimestampType && ((TimestampType)
type).getKind() == TimestampKind.PROCTIME;
+ return resolvedExpr.getOutputDataType();
}
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index a95a3f4..95deb43 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -519,9 +519,8 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
private Optional<CatalogQueryOperation>
scanInternal(UnresolvedIdentifier identifier) {
ObjectIdentifier tableIdentifier =
catalogManager.qualifyIdentifier(identifier);
- return catalogManager.getTable(tableIdentifier).map(t -> {
- return new CatalogQueryOperation(tableIdentifier,
t.getTable().getSchema());
- });
+ return catalogManager.getTable(tableIdentifier)
+ .map(t -> new CatalogQueryOperation(tableIdentifier,
t.getResolvedSchema()));
}
@Override
@@ -1062,7 +1061,7 @@ public class TableEnvironmentImpl implements
TableEnvironmentInternal {
Optional<CatalogManager.TableLookupResult> result =
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
if (result.isPresent()) {
- return
buildDescribeResult(result.get().getTable().getSchema());
+ return
buildDescribeResult(result.get().getResolvedSchema());
} else {
throw new ValidationException(String.format(
"Tables or views with the
identifier '%s' doesn't exist",
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 9c1e1d3..f80efd1 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
@@ -323,18 +324,25 @@ public final class CatalogManager {
public static class TableLookupResult {
private final boolean isTemporary;
private final CatalogBaseTable table;
+ private final TableSchema resolvedSchema;
- private static TableLookupResult temporary(CatalogBaseTable
table) {
- return new TableLookupResult(true, table);
+ @VisibleForTesting
+ public static TableLookupResult temporary(CatalogBaseTable
table, TableSchema resolvedSchema) {
+ return new TableLookupResult(true, table,
resolvedSchema);
}
- private static TableLookupResult permanent(CatalogBaseTable
table) {
- return new TableLookupResult(false, table);
+ @VisibleForTesting
+ public static TableLookupResult permanent(CatalogBaseTable
table, TableSchema resolvedSchema) {
+ return new TableLookupResult(false, table,
resolvedSchema);
}
- private TableLookupResult(boolean isTemporary, CatalogBaseTable
table) {
+ private TableLookupResult(
+ boolean isTemporary,
+ CatalogBaseTable table,
+ TableSchema resolvedSchema) {
this.isTemporary = isTemporary;
this.table = table;
+ this.resolvedSchema = resolvedSchema;
}
public boolean isTemporary() {
@@ -344,6 +352,10 @@ public final class CatalogManager {
public CatalogBaseTable getTable() {
return table;
}
+
+ public TableSchema getResolvedSchema() {
+ return resolvedSchema;
+ }
}
/**
@@ -357,21 +369,15 @@ public final class CatalogManager {
Preconditions.checkNotNull(schemaResolver, "schemaResolver
should not be null");
CatalogBaseTable temporaryTable =
temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
- return
Optional.of(TableLookupResult.temporary(resolveTableSchema(temporaryTable)));
+ TableSchema resolvedSchema =
resolveTableSchema(temporaryTable);
+ return
Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
} else {
- Optional<TableLookupResult> result =
getPermanentTable(objectIdentifier);
- return result.map(tableLookupResult ->
-
TableLookupResult.permanent(resolveTableSchema(tableLookupResult.getTable())));
+ return getPermanentTable(objectIdentifier);
}
}
- private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
- if (!(table instanceof CatalogTableImpl)) {
- return table;
- }
- CatalogTableImpl catalogTableImpl = (CatalogTableImpl) table;
- TableSchema newTableSchema =
schemaResolver.resolve(catalogTableImpl.getSchema());
- return catalogTableImpl.copy(newTableSchema);
+ private TableSchema resolveTableSchema(CatalogBaseTable table) {
+ return schemaResolver.resolve(table.getSchema());
}
/**
@@ -398,7 +404,9 @@ public final class CatalogManager {
ObjectPath objectPath = objectIdentifier.toObjectPath();
if (currentCatalog != null) {
try {
- return
Optional.of(TableLookupResult.permanent(currentCatalog.getTable(objectPath)));
+ CatalogBaseTable catalogTable =
currentCatalog.getTable(objectPath);
+ TableSchema resolvedSchema =
resolveTableSchema(catalogTable);
+ return
Optional.of(TableLookupResult.permanent(catalogTable, resolvedSchema));
} catch (TableNotExistException e) {
// Ignore.
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index dfe5d8b..9ac9c5c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -88,14 +88,6 @@ public class CatalogTableImpl extends AbstractCatalogTable {
return new CatalogTableImpl(getSchema(), getPartitionKeys(),
options, getComment());
}
- public CatalogTable copy(TableSchema tableSchema) {
- return new CatalogTableImpl(
- tableSchema.copy(),
- new ArrayList<>(getPartitionKeys()),
- new HashMap<>(getProperties()),
- getComment());
- }
-
/**
* Construct a {@link CatalogTableImpl} from complete properties that
contains table schema.
*/
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 2ad212b..ca7bda8 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -1335,7 +1335,7 @@ public abstract class CatalogTest {
@Override
public TableSchema getSchema() {
- return null;
+ return TableSchema.builder().build();
}
@Override
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 23a8d80..ebf53f4 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -18,16 +18,16 @@
package org.apache.flink.table.planner.catalog;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -67,10 +67,9 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
//~ Instance fields
--------------------------------------------------------
private final ObjectIdentifier tableIdentifier;
- private final CatalogBaseTable catalogBaseTable;
+ private final TableLookupResult lookupResult;
private final FlinkStatistic statistic;
private final boolean isStreamingMode;
- private final boolean isTemporary;
private final Catalog catalog;
//~ Constructors
-----------------------------------------------------------
@@ -79,25 +78,22 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
* Create a CatalogSchemaTable instance.
*
* @param tableIdentifier Table identifier
- * @param catalogBaseTable CatalogBaseTable instance which exists in
the catalog
+ * @param lookupResult A result of catalog lookup
* @param statistic Table statistics
* @param catalog The catalog which the schema table belongs to
* @param isStreaming If the table is for streaming mode
- * @param isTemporary If the table is temporary
*/
public CatalogSchemaTable(
ObjectIdentifier tableIdentifier,
- CatalogBaseTable catalogBaseTable,
+ TableLookupResult lookupResult,
FlinkStatistic statistic,
Catalog catalog,
- boolean isStreaming,
- boolean isTemporary) {
+ boolean isStreaming) {
this.tableIdentifier = tableIdentifier;
- this.catalogBaseTable = catalogBaseTable;
+ this.lookupResult = lookupResult;
this.statistic = statistic;
this.catalog = catalog;
this.isStreamingMode = isStreaming;
- this.isTemporary = isTemporary;
}
//~ Methods
----------------------------------------------------------------
@@ -111,11 +107,11 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
}
public CatalogBaseTable getCatalogTable() {
- return catalogBaseTable;
+ return lookupResult.getTable();
}
public boolean isTemporary() {
- return isTemporary;
+ return lookupResult.isTemporary();
}
public boolean isStreamingMode() {
@@ -124,23 +120,13 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return getRowType(typeFactory, catalogBaseTable,
isStreamingMode);
- }
-
- @Override
- public FlinkStatistic getStatistic() {
- return statistic;
- }
-
- private RelDataType getRowType(RelDataTypeFactory typeFactory,
- CatalogBaseTable catalogBaseTable,
- boolean isStreamingMode) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory)
typeFactory;
- TableSchema tableSchema = catalogBaseTable.getSchema();
+ TableSchema tableSchema = lookupResult.getResolvedSchema();
final DataType[] fieldDataTypes =
tableSchema.getFieldDataTypes();
+ CatalogBaseTable catalogTable = lookupResult.getTable();
if (!isStreamingMode
- && catalogBaseTable instanceof ConnectorCatalogTable
- && ((ConnectorCatalogTable)
catalogBaseTable).getTableSource().isPresent()) {
+ && catalogTable instanceof ConnectorCatalogTable
+ && ((ConnectorCatalogTable<?, ?>)
catalogTable).getTableSource().isPresent()) {
// If the table source is bounded, materialize the time
attributes to normal TIMESTAMP type.
// Now for ConnectorCatalogTable, there is no way to
// deduce if it is bounded in the table environment, so
the data types in TableSchema
@@ -180,10 +166,15 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
}
return TableSourceUtil.getSourceRowType(
- flinkTypeFactory,
- tableSchema,
- scala.Option.empty(),
- isStreamingMode);
+ flinkTypeFactory,
+ tableSchema,
+ scala.Option.empty(),
+ isStreamingMode);
+ }
+
+ @Override
+ public FlinkStatistic getStatistic() {
+ return statistic;
}
@Override
@@ -199,15 +190,15 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
private Optional<TableSource<?>> findAndCreateTableSource() {
Optional<TableSource<?>> tableSource = Optional.empty();
try {
- if (catalogBaseTable instanceof CatalogTableImpl) {
+ if (lookupResult.getTable() instanceof CatalogTable) {
// Use an empty config for
TableSourceFactoryContextImpl since we can't fetch the
// actual TableConfig here. And currently the
empty config do not affect the logic.
- ReadableConfig config = new
TableConfig().getConfiguration();
+ ReadableConfig config = new Configuration();
TableSourceFactory.Context context =
- new
TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) catalogBaseTable,
config);
+ new
TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable)
lookupResult.getTable(), config);
TableSource<?> source =
TableFactoryUtil.findAndCreateTableSource(context);
if (source instanceof StreamTableSource) {
- if (!isStreamingMode &&
!((StreamTableSource) source).isBounded()) {
+ if (!isStreamingMode &&
!((StreamTableSource<?>) source).isBounded()) {
throw new
ValidationException("Cannot query on an unbounded source in batch mode, but " +
tableIdentifier.asSummaryString() + " is unbounded.");
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
index 66ee213..22ceb0f 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
@@ -78,11 +78,10 @@ class DatabaseCalciteSchema extends FlinkSchema {
FlinkStatistic statistic =
getStatistic(result.isTemporary(), table, identifier);
return new CatalogSchemaTable(
identifier,
- table,
+ result,
statistic,
catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),
- isStreamingMode,
- result.isTemporary());
+ isStreamingMode);
})
.orElse(null);
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
new file mode 100644
index 0000000..8771a51
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Tumble;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+
+/**
+ * Tests for resolving types of computed columns (including time attributes)
of tables from catalog.
+ */
+@RunWith(Parameterized.class)
+public class JavaCatalogTableTest extends TableTestBase {
+ @Parameterized.Parameters(name = "streamingMode = {0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(true, false);
+ }
+
+ @Parameterized.Parameter
+ public boolean isStreamingMode;
+
+ private TableTestUtil getTestUtil() {
+ if (isStreamingMode) {
+ return streamTestUtil(new TableConfig());
+ } else {
+ return batchTestUtil(new TableConfig());
+ }
+ }
+
+ @Test
+ public void testResolvingSchemaOfCustomCatalogTableSql() throws
Exception {
+ TableTestUtil testUtil = getTestUtil();
+ TableEnvironment tableEnvironment = testUtil.getTableEnv();
+ GenericInMemoryCatalog genericInMemoryCatalog = new
GenericInMemoryCatalog("in-memory");
+ genericInMemoryCatalog.createTable(
+ new ObjectPath("default", "testTable"),
+ new CustomCatalogTable(isStreamingMode),
+ false);
+ tableEnvironment.registerCatalog("testCatalog",
genericInMemoryCatalog);
+ tableEnvironment.executeSql("CREATE VIEW testTable2 AS SELECT *
FROM testCatalog.`default`.testTable");
+
+ testUtil.verifyPlan(
+ "SELECT COUNT(*) FROM testTable2 GROUP BY
TUMBLE(rowtime, INTERVAL '10' MINUTE)");
+ }
+
+ @Test
+ public void testResolvingSchemaOfCustomCatalogTableTableApi() throws
Exception {
+ TableTestUtil testUtil = getTestUtil();
+ TableEnvironment tableEnvironment = testUtil.getTableEnv();
+ GenericInMemoryCatalog genericInMemoryCatalog = new
GenericInMemoryCatalog("in-memory");
+ genericInMemoryCatalog.createTable(
+ new ObjectPath("default", "testTable"),
+ new CustomCatalogTable(isStreamingMode),
+ false);
+ tableEnvironment.registerCatalog("testCatalog",
genericInMemoryCatalog);
+
+ Table table =
tableEnvironment.from("testCatalog.`default`.testTable")
+
.window(Tumble.over(lit(10).minute()).on($("rowtime")).as("w"))
+ .groupBy($("w"))
+ .select(lit(1).count());
+ testUtil.verifyPlan(table);
+ }
+
+ @Test
+ public void testResolvingProctimeOfCustomTableSql() throws Exception {
+ if (!isStreamingMode) {
+ // proctime not supported in batch
+ return;
+ }
+ TableTestUtil testUtil = getTestUtil();
+ TableEnvironment tableEnvironment = testUtil.getTableEnv();
+ GenericInMemoryCatalog genericInMemoryCatalog = new
GenericInMemoryCatalog("in-memory");
+ genericInMemoryCatalog.createTable(
+ new ObjectPath("default", "testTable"),
+ new CustomCatalogTable(isStreamingMode),
+ false);
+ tableEnvironment.registerCatalog("testCatalog",
genericInMemoryCatalog);
+
+ testUtil.verifyPlan("SELECT COUNT(*) FROM
testCatalog.`default`.testTable " +
+ "GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE)");
+ }
+
+ @Test
+ public void testResolvingProctimeOfCustomTableTableApi() throws
Exception {
+ if (!isStreamingMode) {
+ // proctime not supported in batch
+ return;
+ }
+ TableTestUtil testUtil = getTestUtil();
+ TableEnvironment tableEnvironment = testUtil.getTableEnv();
+ GenericInMemoryCatalog genericInMemoryCatalog = new
GenericInMemoryCatalog("in-memory");
+ genericInMemoryCatalog.createTable(
+ new ObjectPath("default", "testTable"),
+ new CustomCatalogTable(isStreamingMode),
+ false);
+ tableEnvironment.registerCatalog("testCatalog",
genericInMemoryCatalog);
+
+ Table table =
tableEnvironment.from("testCatalog.`default`.testTable")
+
.window(Tumble.over(lit(10).minute()).on($("proctime")).as("w"))
+ .groupBy($("w"))
+ .select(lit(1).count());
+ testUtil.verifyPlan(table);
+ }
+
+ private static class CustomCatalogTable implements CatalogTable {
+
+ private final boolean isStreamingMode;
+
+ private CustomCatalogTable(boolean isStreamingMode) {
+ this.isStreamingMode = isStreamingMode;
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return false;
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogTable copy(Map<String, String> options) {
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ Map<String, String> map = new HashMap<>();
+ map.put("connector", "values");
+ map.put("bounded", Boolean.toString(!isStreamingMode));
+ return map;
+ }
+
+ @Override
+ public TableSchema getSchema() {
+ return TableSchema.builder()
+ .field("count", DataTypes.BIGINT())
+ .field("rowtime", DataTypes.TIMESTAMP())
+ .field("proctime", DataTypes.TIMESTAMP(),
"proctime()")
+ .watermark("rowtime", "rowtime - INTERVAL '5'
SECONDS", DataTypes.TIMESTAMP())
+ .build();
+ }
+
+ @Override
+ public String getComment() {
+ return null;
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return this;
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
index 3ec52507..a1a377d 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -72,13 +73,15 @@ public class FlinkCalciteCatalogReaderTest {
@Test
public void testGetFlinkPreparingTableBase() {
// Mock CatalogSchemaTable.
+ TableSchema schema = TableSchema.builder().build();
CatalogSchemaTable mockTable = new CatalogSchemaTable(
ObjectIdentifier.of("a", "b", "c"),
- ConnectorCatalogTable.source(new TestTableSource(true,
TableSchema.builder().build()), true),
+
CatalogManager.TableLookupResult.permanent(ConnectorCatalogTable.source(
+ new TestTableSource(true, schema),
+ true), schema),
FlinkStatistic.UNKNOWN(),
null,
- true,
- false);
+ true);
rootSchemaPlus.add(tableMockName, mockTable);
Prepare.PreparingTable preparingTable = catalogReader
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
new file mode 100644
index 0000000..821dba7
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
@@ -0,0 +1,153 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testResolvingProctimeOfCustomTableSql[streamingMode = true]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*) FROM testCatalog.`default`.testTable GROUP BY
TUMBLE(proctime, INTERVAL '10' MINUTE)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+ +- LogicalProject($f0=[$TUMBLE($2, 600000:INTERVAL MINUTE)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1,
5000:INTERVAL SECOND)])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, proctime, 600000)],
select=[COUNT(*) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- Calc(select=[proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[testCatalog, default, testTable]],
fields=[count, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = true]">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)],
window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1,
5000:INTERVAL SECOND)])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)],
select=[COUNT($f3) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- Calc(select=[count, rowtime, proctime, 1 AS $f3])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[testCatalog, default, testTable]],
fields=[count, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testResolvingProctimeOfCustomTableTableApi[streamingMode =
true]">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)],
window=[TumblingGroupWindow('w, proctime, 600000)], properties=[])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[$2], $f3=[1])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1,
5000:INTERVAL SECOND)])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w, proctime, 600000)],
select=[COUNT($f3) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- Calc(select=[count, rowtime, proctime, 1 AS $f3])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[testCatalog, default, testTable]],
fields=[count, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode =
false]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime,
INTERVAL '10' MINUTE)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+ +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)],
select=[Final_COUNT(count1$0) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w$, rowtime,
600000)], select=[Partial_COUNT(*) AS count1$0])
+ +- TableSourceScan(table=[[testCatalog, default, testTable,
project=[rowtime]]], fields=[rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testResolvingSchemaOfCustomCatalogTableSql[streamingMode =
true]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*) FROM testTable2 GROUP BY TUMBLE(rowtime,
INTERVAL '10' MINUTE)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+ +- LogicalProject($f0=[$TUMBLE($1, 600000:INTERVAL MINUTE)])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1,
5000:INTERVAL SECOND)])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 600000)],
select=[COUNT(*) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- Calc(select=[rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ +- Calc(select=[count, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[testCatalog, default, testTable]],
fields=[count, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testResolvingSchemaOfCustomCatalogTableTableApi[streamingMode = false]">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[$0])
++- LogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($3)],
window=[TumblingGroupWindow('w, rowtime, 600000)], properties=[])
+ +- LogicalProject(count=[$0], rowtime=[$1], proctime=[PROCTIME()], $f3=[1])
+ +- LogicalTableScan(table=[[testCatalog, default, testTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashWindowAggregate(window=[TumblingGroupWindow('w, rowtime, 600000)],
select=[Final_COUNT(count$0) AS EXPR$0])
++- Exchange(distribution=[single])
+ +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, rowtime,
600000)], select=[Partial_COUNT($f3) AS count$0])
+ +- Calc(select=[count, rowtime, PROCTIME() AS proctime, 1 AS $f3])
+ +- TableSourceScan(table=[[testCatalog, default, testTable]],
fields=[count, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 6226dff..90f2eb6 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.catalog.CatalogManager.TableLookupResult;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSourceFactory;
@@ -79,7 +80,6 @@ class DatabaseCalciteSchema implements Schema {
ObjectIdentifier identifier = ObjectIdentifier.of(catalogName,
databaseName, tableName);
return catalogManager.getTable(identifier)
.map(result -> {
- CatalogBaseTable table = result.getTable();
final TableFactory tableFactory;
if (result.isTemporary()) {
tableFactory = null;
@@ -88,36 +88,49 @@ class DatabaseCalciteSchema implements Schema {
.flatMap(Catalog::getTableFactory)
.orElse(null);
}
- return convertTable(identifier, table,
tableFactory);
+ return convertTable(identifier, result,
tableFactory);
})
.orElse(null);
}
- private Table convertTable(ObjectIdentifier identifier,
CatalogBaseTable table, @Nullable TableFactory tableFactory) {
+ private Table convertTable(ObjectIdentifier identifier,
TableLookupResult lookupResult, @Nullable TableFactory tableFactory) {
+ CatalogBaseTable table = lookupResult.getTable();
+ TableSchema resolvedSchema = lookupResult.getResolvedSchema();
if (table instanceof QueryOperationCatalogView) {
- return
QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView)
table));
+ return
QueryOperationCatalogViewTable.createCalciteTable(
+ ((QueryOperationCatalogView) table),
+ resolvedSchema);
} else if (table instanceof ConnectorCatalogTable) {
- return convertConnectorTable((ConnectorCatalogTable<?,
?>) table);
- } else if (table instanceof CatalogTable) {
- return convertCatalogTable(identifier, (CatalogTable)
table, tableFactory);
- } else if (table instanceof CatalogView) {
- return convertCatalogView(identifier.getObjectName(),
(CatalogView) table);
+ return convertConnectorTable((ConnectorCatalogTable<?,
?>) table, resolvedSchema);
} else {
- throw new TableException("Unsupported table type: " +
table);
+ if (table instanceof CatalogTable) {
+ return convertCatalogTable(
+ identifier,
+ (CatalogTable) table,
+ resolvedSchema,
+ tableFactory);
+ } else if (table instanceof CatalogView) {
+ return convertCatalogView(
+ identifier.getObjectName(),
+ (CatalogView) table,
+ resolvedSchema);
+ } else {
+ throw new TableException("Unsupported table
type: " + table);
+ }
}
}
- private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
- Optional<TableSourceTable> tableSourceTable =
table.getTableSource()
+ private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table,
TableSchema resolvedSchema) {
+ Optional<TableSourceTable<?>> tableSourceTable =
table.getTableSource()
.map(tableSource -> new TableSourceTable<>(
- table.getSchema(),
+ resolvedSchema,
tableSource,
!table.isBatch(),
FlinkStatistic.UNKNOWN()));
if (tableSourceTable.isPresent()) {
return tableSourceTable.get();
} else {
- Optional<TableSinkTable> tableSinkTable =
table.getTableSink()
+ Optional<TableSinkTable<?>> tableSinkTable =
table.getTableSink()
.map(tableSink -> new TableSinkTable<>(
tableSink,
FlinkStatistic.UNKNOWN()));
@@ -130,13 +143,17 @@ class DatabaseCalciteSchema implements Schema {
}
}
- private Table convertCatalogTable(ObjectIdentifier identifier,
CatalogTable table, @Nullable TableFactory tableFactory) {
+ private Table convertCatalogTable(
+ ObjectIdentifier identifier,
+ CatalogTable table,
+ TableSchema resolvedSchema,
+ @Nullable TableFactory tableFactory) {
final TableSource<?> tableSource;
final TableSourceFactory.Context context = new
TableSourceFactoryContextImpl(
identifier, table,
tableConfig.getConfiguration());
if (tableFactory != null) {
if (tableFactory instanceof TableSourceFactory) {
- tableSource = ((TableSourceFactory)
tableFactory).createTableSource(context);
+ tableSource = ((TableSourceFactory<?>)
tableFactory).createTableSource(context);
} else {
throw new TableException(
"Cannot query a sink-only table.
TableFactory provided by catalog must implement TableSourceFactory");
@@ -150,7 +167,7 @@ class DatabaseCalciteSchema implements Schema {
}
return new TableSourceTable<>(
- table.getSchema(),
+ resolvedSchema,
tableSource,
// this means the TableSource extends from
StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the
information that comes from the TableSource
@@ -160,11 +177,10 @@ class DatabaseCalciteSchema implements Schema {
);
}
- private Table convertCatalogView(String tableName, CatalogView table) {
- TableSchema schema = table.getSchema();
+ private Table convertCatalogView(String tableName, CatalogView table,
TableSchema resolvedSchema) {
return new ViewTable(
null,
- typeFactory -> ((FlinkTypeFactory)
typeFactory).buildLogicalRowType(schema),
+ typeFactory -> ((FlinkTypeFactory)
typeFactory).buildLogicalRowType(resolvedSchema),
table.getExpandedQuery(),
Arrays.asList(catalogName, databaseName),
Arrays.asList(catalogName, databaseName, tableName)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
index dfcced1..bbbb6d4 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogViewTable.java
@@ -51,12 +51,13 @@ public class QueryOperationCatalogViewTable extends
AbstractTable implements Tra
private final QueryOperationCatalogView catalogView;
private final RelProtoDataType rowType;
- public static QueryOperationCatalogViewTable
createCalciteTable(QueryOperationCatalogView catalogView) {
+ public static QueryOperationCatalogViewTable createCalciteTable(
+ QueryOperationCatalogView catalogView,
+ TableSchema resolvedSchema) {
return new QueryOperationCatalogViewTable(catalogView,
typeFactory -> {
- TableSchema tableSchema = catalogView.getSchema();
final FlinkTypeFactory flinkTypeFactory =
(FlinkTypeFactory) typeFactory;
- final RelDataType relType =
flinkTypeFactory.buildLogicalRowType(tableSchema);
- Boolean[] nullables = tableSchema
+ final RelDataType relType =
flinkTypeFactory.buildLogicalRowType(resolvedSchema);
+ Boolean[] nullables = resolvedSchema
.getTableColumns()
.stream()
.map(c ->
c.getType().getLogicalType().isNullable())
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 4e554f2..db10922 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -482,7 +482,7 @@ abstract class TableEnvImpl(
val objectIdentifier: ObjectIdentifier =
catalogManager.qualifyIdentifier(identifier)
JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
- .map(t => new CatalogQueryOperation(objectIdentifier,
t.getTable.getSchema))
+ .map(t => new CatalogQueryOperation(objectIdentifier,
t.getResolvedSchema))
}
override def listModules(): Array[String] = {