twalthr commented on a change in pull request #18667:
URL: https://github.com/apache/flink/pull/18667#discussion_r804458083
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/DeduplicationJsonPlanITCase.java
##########
@@ -71,6 +71,7 @@ public void testDeduplication() throws Exception {
tableEnv.getConfig()
.getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ checkTransformationUids(compiledPlan);
Review comment:
Is there a reason why not to use `compileSqlAndExecutePlan`? I think the
configuration can also be set before without side effects?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
##########
@@ -83,46 +80,6 @@ public void testLegacyStreamSource() {
assertTrue(sourceTransform.getOperator().emitsProgressiveWatermarks());
}
- @Test
- public void testStreamTransformationScanProvider() {
- final JavaStreamTableTestUtil util = javaStreamTestUtil();
- final StreamTableEnvironment env = util.tableEnv();
-
- final Table table =
- env.from(
- TableDescriptor.forConnector("values")
- .option("bounded", "false")
- .schema(dummySchema())
- .build());
-
- final Transformation<RowData> transformation =
- env.toChangelogStream(table)
- .<RowData>map(r -> new GenericRowData(0))
- .getTransformation();
-
- assertFalse(TransformationScanProvider.of(transformation,
"scan").isBounded());
- }
-
- @Test
- public void testBatchTransformationScanProvider() {
Review comment:
the tests are important, they test the propagation of boundedness.
please readd them
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationSinkProvider.java
##########
@@ -37,15 +40,24 @@
@Internal
public interface TransformationSinkProvider extends
DynamicTableSink.SinkRuntimeProvider {
- /** Creates a transformation for transforming the input provided in the
context. */
+ /**
+ * Creates a transformation for transforming the input provided in the
context.
+ *
+ * <p>This method MUST set an uid for each node of the transformation
sink, when the job is
+ * unbounded, which can be generated with {@link
Context#generateUid(String)}.
+ */
Transformation<?> createTransformation(Context context);
/** Context for {@link #createTransformation(Context)}. */
- interface Context {
+ interface Context extends ProviderContext {
Review comment:
true, but if you implement a connector, you have a couple of `Context`
classes and the code could become quite messy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]