nlu90 commented on a change in pull request #11056:
URL: https://github.com/apache/pulsar/pull/11056#discussion_r658299807



##########
File path: 
pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -167,55 +177,53 @@ public void seekPauseResumeTest() throws Exception {
         assertEquals(MessageIdUtils.getOffset(msgId), 
sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.offset(tp, 0);
-        verify(mockCtx, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), 
any());
+        verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), 
any());
         assertEquals(0, sink.currentOffset(tp.topic(), tp.partition()));
 
         sink.taskContext.pause(tp);
-        verify(mockCtx, times(1)).pause(tp.topic(), tp.partition());
+        verify(context, times(1)).pause(tp.topic(), tp.partition());
         sink.taskContext.resume(tp);
-        verify(mockCtx, times(1)).resume(tp.topic(), tp.partition());
+        verify(context, times(1)).resume(tp.topic(), tp.partition());
 
         sink.close();
     }
 
 
     @Test
     public void subscriptionTypeTest() throws Exception {
-        SinkContext mockCtx = Mockito.mock(SinkContext.class);
-        
when(mockCtx.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
         try (KafkaConnectSink sink = new KafkaConnectSink()) {
-            log.info("Exclusive is allowed");
-            sink.open(props, mockCtx);
+            log.info("Failover is allowed");

Review comment:
       I switched the order of the first two tests since by default the 
subscriptiontype is `failover`. The exclusive test is actually moved to line 199

##########
File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
##########
@@ -136,9 +141,6 @@ void processMessage(Message<byte[]> message) {
     @Override
     public void start() {
         try {
-            client = PulsarClient.builder()
-                .serviceUrl(serviceUrl)
-                .build();
             log.info("Successfully created pulsar client to {}", serviceUrl);

Review comment:
       Yeah, plan to remove the `serviceUrl` after the current unit test fix. 
Thanks for the reminding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to