CrynetLogistics commented on a change in pull request #17907:
URL: https://github.com/apache/flink/pull/17907#discussion_r778172167



##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactoryTest.java
##########
@@ -186,12 +193,87 @@ public void testGoodTableSinkForNonPartitionedTable() {
 
         // Construct expected DynamicTableSink using factory under test
         KinesisDynamicSink expectedSink =
-                new KinesisDynamicSink(
-                        sinkSchema.toPhysicalRowDataType(),
-                        STREAM_NAME,
-                        defaultProducerProperties(),
-                        new TestFormatFactory.EncodingFormatMock(","),
-                        new RandomKinesisPartitioner<>());
+                (KinesisDynamicSink)
+                        new KinesisDynamicSink.KinesisDynamicTableSinkBuilder()
+                                
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+                                .setStream(STREAM_NAME)
+                                
.setKinesisClientProperties(defaultProducerProperties())
+                                .setEncodingFormat(new 
TestFormatFactory.EncodingFormatMock(","))
+                                .setPartitioner(new 
RandomKinesisKeyGenerator<>())
+                                .build();
+
+        // verify that the constructed DynamicTableSink is as expected
+        assertEquals(expectedSink, actualSink);
+
+        // verify that the copy of the constructed DynamicTableSink is as 
expected
+        assertEquals(expectedSink.copy(), actualSink);
+
+        // verify the produced sink
+        DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+                actualSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+        Sink<RowData, ?, ?, ?> sinkFunction =
+                as(sinkFunctionProvider, SinkProvider.class).createSink();
+        assertThat(sinkFunction, instanceOf(KinesisDataStreamsSink.class));
+    }
+
+    @Test
+    public void testGoodTableSinkForNonPartitionedTableWithSinkOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions = 
defaultTableOptionsWithSinkOptions().build();
+
+        // Construct actual DynamicTableSink using FactoryUtil
+        KinesisDynamicSink actualSink =
+                (KinesisDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        KinesisDynamicSink expectedSink =
+                (KinesisDynamicSink)
+                        getDefaultSinkBuilder()
+                                
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
+                                .setStream(STREAM_NAME)
+                                
.setKinesisClientProperties(defaultProducerProperties())
+                                .setEncodingFormat(new 
TestFormatFactory.EncodingFormatMock(","))
+                                .setPartitioner(new 
RandomKinesisKeyGenerator<>())
+                                .build();
+
+        // verify that the constructed DynamicTableSink is as expected
+        assertEquals(expectedSink, actualSink);
+
+        // verify that the copy of the constructed DynamicTableSink is as 
expected
+        assertEquals(expectedSink.copy(), actualSink);
+
+        // verify the produced sink
+        DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
+                actualSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+        Sink<RowData, ?, ?, ?> sinkFunction =
+                as(sinkFunctionProvider, SinkProvider.class).createSink();
+        assertThat(sinkFunction, instanceOf(KinesisDataStreamsSink.class));
+    }
+
+    @Test
+    public void testGoodTableSinkForNonPartitionedTableWithProducerOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions = 
defaultTableOptionsWithDeprecatedOptions().build();
+
+        // Construct actual DynamicTableSink using FactoryUtil
+        KinesisDynamicSink actualSink =
+                (KinesisDynamicSink) createTableSink(sinkSchema, sinkOptions);
+
+        // Construct expected DynamicTableSink using factory under test
+        // Construct expected DynamicTableSink using factory under test

Review comment:
       Seems to be here twice ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to