wuchong commented on a change in pull request #8966: 
[FLINK-13074][table-planner-blink] Add PartitionableTableSink bridge logic to 
flink&blink …
URL: https://github.com/apache/flink/pull/8966#discussion_r305364606
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##########
 @@ -108,22 +109,36 @@ public Table getTable(String tableName) {
        }
 
        private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
-               return table.getTableSource()
-                               .map(tableSource -> {
-                                       if (!(tableSource instanceof 
StreamTableSource ||
-                                                       tableSource instanceof 
LookupableTableSource)) {
-                                               throw new TableException(
-                                                               "Only 
StreamTableSource and LookupableTableSource can be used in Blink planner.");
-                                       }
-                                       if (!isStreamingMode && tableSource 
instanceof StreamTableSource &&
-                                                       
!((StreamTableSource<?>) tableSource).isBounded()) {
-                                               throw new TableException("Only 
bounded StreamTableSource can be used in batch mode.");
-                                       }
-                                       return new TableSourceTable<>(
-                                                       tableSource,
-                                                       isStreamingMode,
-                                                       
FlinkStatistic.UNKNOWN());
-                               }).orElseThrow(() -> new TableException("Cannot 
query a sink only table."));
+               Optional<TableSourceTable> tableSourceTable = 
table.getTableSource()
+                       .map(tableSource -> {
+                               if (!(tableSource instanceof StreamTableSource 
||
+                                       tableSource instanceof 
LookupableTableSource)) {
+                                       throw new TableException(
+                                               "Only StreamTableSource and 
LookupableTableSource can be used in Blink planner.");
+                               }
+                               if (!isStreamingMode && tableSource instanceof 
StreamTableSource &&
+                                       !((StreamTableSource<?>) 
tableSource).isBounded()) {
+                                       throw new TableException("Only bounded 
StreamTableSource can be used in batch mode.");
+                               }
+                               return new TableSourceTable<>(
+                                       tableSource,
+                                       isStreamingMode,
+                                       FlinkStatistic.UNKNOWN());
+                       });
+               if (tableSourceTable.isPresent()) {
+                       return tableSourceTable.get();
+               } else {
+                       Optional<TableSinkTable> tableSinkTable = 
table.getTableSink()
+                               .map(tableSink -> new TableSinkTable<>(
+                                       tableSink,
+                                       FlinkStatistic.UNKNOWN()));
+                       if (tableSinkTable.isPresent()) {
+                               return tableSinkTable.get();
+                       } else {
+                               throw new TableException("Cannot convert a 
connector table " +
+                                       "without either source or sink.");
 
 Review comment:
   What compile error do you mean? 
   I mean replace the code from L131-L140.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to