wuchong commented on a change in pull request #16242:
URL: https://github.com/apache/flink/pull/16242#discussion_r656734888



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunctionTest.java
##########
@@ -256,17 +273,17 @@ private BufferedUpsertSinkFunction 
createBufferedSink(MockedSinkFunction sinkFun
                         new int[] {keyIndices},
                         typeInformation,
                         BUFFER_FLUSH_MODE);
+        
bufferedSinkFunction.getRuntimeContext().getExecutionConfig().enableObjectReuse();

Review comment:
       You already set object reuse in the underlying sink function, so don't 
need to set here again. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunctionTest.java
##########
@@ -129,17 +140,23 @@
                 
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
     };
 
+    private final boolean enableObjectReuse;
+
+    public BufferedUpsertSinkFunctionTest(boolean enableObjectReuse) {
+        this.enableObjectReuse = enableObjectReuse;
+    }
+
     @Test
     public void testWriteData() throws Exception {
-        MockedSinkFunction sinkFunction = new MockedSinkFunction();
+        MockedSinkFunction sinkFunction = new 
MockedSinkFunction(enableObjectReuse);

Review comment:
       You can make `MockedSinkFunction` non-static to avoid passing 
`enableObjectReuse` here and there. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to