This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3358cd3  [FLINK-13047][table] Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchema
3358cd3 is described below

commit 3358cd3b6d6ffebb313b6c02f2a53e6dbd6ec1ed
Author: Xuefu Zhang <[email protected]>
AuthorDate: Mon Jul 1 15:25:44 2019 -0700

    [FLINK-13047][table] Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchema
    
    This PR fixes the Optional.orElse() usage issue in DatabaseCalciteSchem.
    
    This closes #8940.
---
 .../apache/flink/table/catalog/DatabaseCalciteSchema.java | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index e629978..95475ef 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -104,9 +103,19 @@ class DatabaseCalciteSchema implements Schema {
        }
 
        private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
+               TableSource<?> tableSource;
                Optional<TableFactory> tableFactory = catalog.getTableFactory();
-               TableSource<Row> tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-                       
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+               if (tableFactory.isPresent()) {
+                       TableFactory tf = tableFactory.get();
+                       if (tf instanceof TableSourceFactory) {
+                               tableSource = ((TableSourceFactory) 
tf).createTableSource(tablePath, table);
+                       } else {
+                               throw new TableException(String.format("Cannot 
query a sink-only table. TableFactory provided by catalog %s must implement 
TableSourceFactory",
+                                       catalog.getClass()));
+                       }
+               } else {
+                       tableSource = 
TableFactoryUtil.findAndCreateTableSource(table);
+               }
 
                if (!(tableSource instanceof StreamTableSource)) {
                        throw new TableException("Catalog tables support only 
StreamTableSource and InputFormatTableSource");

Reply via email to