twalthr commented on a change in pull request #17384:
URL: https://github.com/apache/flink/pull/17384#discussion_r719319140



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
##########
@@ -52,5 +54,47 @@
         return Optional.empty();
     }
 
+    /**
+     * Returns a {@link DynamicTableSourceFactory} for creating source tables.
+     *
+     * <p>A factory is determined with the following precedence rule:
+     *
+     * <ul>
+     *   <li>1. Factory provided by the corresponding catalog of a persisted 
table.
+     *   <li>2. Factory provided by a module.
+     *   <li>3. Factory discovered using Java SPI.
+     * </ul>
+     *
+     * <p>This will be called on loaded modules in the order in which they 
have been loaded. The
+     * first factory returned will be used.
+     *
+     * <p>This method can be useful to disable Java SPI completely or 
influence how temporary table
+     * sources should be created without a corresponding catalog.
+     */
+    default Optional<DynamicTableSourceFactory> getTableSourceFactory() {
+        return Optional.empty();
+    }
+
+    /**
+     * Returns a {@link DynamicTableSinkFactory} for creating sink tables.
+     *
+     * <p>A factory is determined with the following precedence rule:
+     *
+     * <ul>
+     *   <li>1. Factory provided by the corresponding catalog of a persisted 
table.
+     *   <li>2. Factory provided by a module.
+     *   <li>3. Factory discovered using Java SPI.
+     * </ul>
+     *
+     * <p>This will be called on loaded modules in the order in which they 
have been loaded. The
+     * first factory returned will be used.
+     *
+     * <p>This method can be useful to disable Java SPI completely or 
influence how temporary table
+     * sources should be created without a corresponding catalog.

Review comment:
       `sinks`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
##########
@@ -75,7 +75,7 @@ public StreamExecSink(
             String description) {
         super(
                 tableSinkSpec,
-                
tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode),
+                inputChangelogMode,

Review comment:
       this is clearly a bug, the `inputChangelogMode` is only a suggestion for 
the sink. in the end the sink decides about the final mode.

##########
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java
##########
@@ -165,7 +165,7 @@ public void testVerboseErrorMessage() throws Exception {
                 new String[] {
                     "org.apache.flink.table.api.ValidationException: Could not 
find any factory for identifier 'invalid'",
                     "at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory",
-                    "at 
org.apache.flink.table.factories.FactoryUtil.createTableSource"
+                    "at 
org.apache.flink.table.factories.FactoryUtil.createSource"

Review comment:
       I guess this needs an update as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to