yunfengzhou-hub commented on code in PR #4381:
URL: https://github.com/apache/paimon/pull/4381#discussion_r1821938741
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java:
##########
@@ -243,16 +257,34 @@ static Table buildPaimonTable(DynamicTableFactory.Context
context) {
newOptions.putAll(origin.getOptions());
newOptions.putAll(dynamicOptions);
- // notice that the Paimon table schema must be the same with the
Flink's
- if (origin instanceof DataCatalogTable) {
- FileStoreTable fileStoreTable = (FileStoreTable)
((DataCatalogTable) origin).table();
- table = fileStoreTable.copyWithoutTimeTravel(newOptions);
+ FileStoreTable fileStoreTable;
+
+ // The following if conditions provide a shortcut to acquire Paimon
table.
+ if (origin instanceof FormatCatalogTable) {
+ fileStoreTable = (FileStoreTable) ((FormatCatalogTable)
origin).table();
+ } else if (origin instanceof DataCatalogTable) {
+ fileStoreTable = (FileStoreTable) ((DataCatalogTable)
origin).table();
+ } else if (flinkCatalog == null) {
+ LOG.warn(
+ "FlinkCatalog is null. The process to find out Paimon
table might be incorrect.");
+ fileStoreTable =
FileStoreTableFactory.create(createCatalogContext(context));
} else {
- table =
- FileStoreTableFactory.create(createCatalogContext(context))
- .copyWithoutTimeTravel(newOptions);
+ // In case the shortcut is not matched, the paimon table can still
be acquired from
+ // catalog.
+ Identifier identifier =
+ Identifier.create(
+ context.getObjectIdentifier().getDatabaseName(),
+ context.getObjectIdentifier().getObjectName());
+ try {
+ fileStoreTable = (FileStoreTable)
flinkCatalog.catalog().getTable(identifier);
Review Comment:
The return type of
`DynamicTableFactory.Context#getCatalogTable().getOrigin()` is `CatalogTable`,
and `CatalogMaterializedTable` is not assignable from `CatalogTable`.
The problem I want to solve is that in the following code
```java
if (origin instanceof DataCatalogTable) {
FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable)
origin).table();
table = fileStoreTable.copyWithoutTimeTravel(newOptions);
} else {
table =
FileStoreTableFactory.create(createCatalogContext(context))
.copyWithoutTimeTravel(newOptions);
}
```
Flink Paimon catalog assumed that if the origin table is not a
`DataCatalogTable`, then it must support `FileStoreTableFactory#create` (i.e.
the scheme of the origin table's path url can be matched with a FileIOLoader).
While in practice I encountered a custom FileIO that does not support SPI
dynamic loading, and thus would throw exceptions when running into
`FileStoreTableFactory#create`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]