This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3358cd3 [FLINK-13047][table] Fix the Optional.orElse() usage issue in
DatabaseCalciteSchema
3358cd3 is described below
commit 3358cd3b6d6ffebb313b6c02f2a53e6dbd6ec1ed
Author: Xuefu Zhang <[email protected]>
AuthorDate: Mon Jul 1 15:25:44 2019 -0700
[FLINK-13047][table] Fix the Optional.orElse() usage issue in
DatabaseCalciteSchema
This PR fixes the Optional.orElse() usage issue in DatabaseCalciteSchem.
This closes #8940.
---
.../apache/flink/table/catalog/DatabaseCalciteSchema.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index e629978..95475ef 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
@@ -104,9 +103,19 @@ class DatabaseCalciteSchema implements Schema {
}
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable
table) {
+ TableSource<?> tableSource;
Optional<TableFactory> tableFactory = catalog.getTableFactory();
- TableSource<Row> tableSource = tableFactory.map(tf ->
((TableSourceFactory) tf).createTableSource(tablePath, table))
-
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+ if (tableFactory.isPresent()) {
+ TableFactory tf = tableFactory.get();
+ if (tf instanceof TableSourceFactory) {
+ tableSource = ((TableSourceFactory)
tf).createTableSource(tablePath, table);
+ } else {
+ throw new TableException(String.format("Cannot
query a sink-only table. TableFactory provided by catalog %s must implement
TableSourceFactory",
+ catalog.getClass()));
+ }
+ } else {
+ tableSource =
TableFactoryUtil.findAndCreateTableSource(table);
+ }
if (!(tableSource instanceof StreamTableSource)) {
throw new TableException("Catalog tables support only
StreamTableSource and InputFormatTableSource");