[ 
https://issues.apache.org/jira/browse/FLINK-18378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140536#comment-17140536
 ] 

Dawid Wysakowicz commented on FLINK-18378:
------------------------------------------

First of all, I have to admit I missed it during the review, therefore I am not 
100% sure why we need the check there.

We should not change the implementation of the {{CatalogTable}}. Theoretically 
{{Catalog}} can return custom implementation that it might need later on to 
instantiate the {{DynamicTableSource/Sink}}. That was the case for {{Hive}} 
some time ago (Now I see it is no longer the case}}. Something like:

{code}
Catalog cat = ...
Catalog cat.getTable();
....
DynamicTableSourceFactory factory = (DynamicTableSourceFactory) 
cat.getFactory();

// where the factory does:

DynamicTableSource createDynamicTableSource(Context context) {
  CatalogTable table = context.getCatalogTable();
  if (!(table instanceof MyCustomTable)) {
      throw new IllegalArgumentException("....");
  }
}

{code}

It is a good questions what do we want to do for views. Views should not have 
any computed columns, but they might have the watermark specification imo. So 
we should probably resolve them too. Which makes the check somewhat unnecessary.

I think we have two options:
1.  we add {{CatalogBaseTable#copy(TableSchema)}} and then do what 
[~nicholasjiang] was suggesting:
{code}
private CatalogBaseTable resolveTableSchema(CatalogBaseTable table) {
   if (!(table instanceof CatalogTable)) {
     return table;
  }
  CatalogTable catalogTable = (CatalogTable) table;
  TableSchema newTableSchema = schemaResolver.resolve(catalogTable.getSchema());
  return catalogTable.copy(newTableSchema);
}
{code} 

2. we do not add the {{copy}} method, but we pass the resolvedSchema as part of 
the {{TableLookup}}

> CatalogManager checks for CatalogTableImpl instead of CatalogTable
> ------------------------------------------------------------------
>
>                 Key: FLINK-18378
>                 URL: https://issues.apache.org/jira/browse/FLINK-18378
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0
>            Reporter: Fabian Hueske
>            Priority: Major
>             Fix For: 1.11.0
>
>
> The {{CatalogManager}} checks for {{CatalogTableImpl}} instead of 
> {{CatalogTable}} to decide whether to resolve the table schema. See 
> https://github.com/apache/flink/blob/release-1.11/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java#L369
> Resolving the table schema adjusts the type of fields which are referenced by 
> watermarks, i.e., changes their type from {{TIMESTAMP(3)}} to {{TIMESTAMP(3) 
> ROWTIME}}. If table schema is not properly resolved some queries involving 
> time attributes will fail during type validation.
> However, {{CatalogTableImpl}} is an internal implementation of the public 
> {{CatalogTable}} interface. Hence, external {{Catalog}} implementations will 
> not work with {{CatalogTableImpl}} but rather {{CatalogTable}} and hence 
> might fail to work correctly with queries that involve event-time attributes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to