imaffe commented on a change in pull request #17937:
URL: https://github.com/apache/flink/pull/17937#discussion_r763963921



##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -126,63 +196,116 @@ void 
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader)
                 "Couldn't poll message from Pulsar.");
     }
 
-    /** Create a split reader with max message 1, fetch timeout 1s. */
-    protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
+    @Test
+    void consumeMessageCreatedAfterHandleSplitChangesAndFetch() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, 
randomAlphabetic(10));
+        fetchedMessages(splitReader, 1, true);
+    }
 
-    /** JUnit5 extension for all the TestTemplate methods in this class. */
-    public class PulsarSplitReaderInvocationContextProvider
-            implements TestTemplateInvocationContextProvider {
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChanges() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
 
-        @Override
-        public boolean supportsTestTemplate(ExtensionContext context) {
-            return true;
-        }
+    @Test
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.earliest);
+        fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+    }
 
-        @Override
-        public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
-                ExtensionContext context) {
-            return Stream.of(new 
PulsarSplitReaderInvocationContext(splitReader()));
-        }
+    @Test
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.latest);
+        fetchedMessages(splitReader, 0, true);
     }
 
-    /** Parameter resolver for Split Reader. */
-    public static class PulsarSplitReaderInvocationContext
-            implements TestTemplateInvocationContext {
+    @Test
+    void 
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor() 
{
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        MessageIdImpl lastMessageId =
+                (MessageIdImpl)
+                        sneakyAdmin(
+                                () ->
+                                        operator()
+                                                .admin()
+                                                .topics()
+                                                .getLastMessageId(
+                                                        
topicNameWithPartition(topicName, 0)));
+        // when doing seek directly on consumer, by default it includes the 
specified messageId
+        seekStartPositionAndHandleSplit(
+                splitReader,
+                topicName,
+                0,
+                new MessageIdImpl(
+                        lastMessageId.getLedgerId(),
+                        lastMessageId.getEntryId() - 1,
+                        lastMessageId.getPartitionIndex()));
+        fetchedMessages(splitReader, 2, true);
+    }
 
-        private final PulsarPartitionSplitReaderBase<?> splitReader;
+    @Test
+    void emptyTopic() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().createTopic(topicName, DEFAULT_PARTITIONS);
+        seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
 
-        public 
PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase<?> 
splitReader) {
-            this.splitReader = checkNotNull(splitReader);
-        }
+    @Test
+    void emptyTopicWithoutSeek() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+        operator().createTopic(topicName, DEFAULT_PARTITIONS);
+        handleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
 
-        @Override
-        public String getDisplayName(int invocationIndex) {
-            return splitReader.getClass().getSimpleName();
+    @Test
+    void wakeupSplitReaderShouldNotCauseException() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        handleSplit(splitReader, "non-exist", 0);
+        AtomicReference<Throwable> error = new AtomicReference<>();
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                splitReader.fetch();
+                            } catch (Throwable e) {
+                                error.set(e);
+                            }
+                        },
+                        "testWakeUp-thread");
+        t.start();
+        long deadline = System.currentTimeMillis() + 5000L;
+        while (t.isAlive() && System.currentTimeMillis() < deadline) {
+            splitReader.wakeUp();
+            sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
         }
+        assertNull(error.get());
+    }
 
-        @Override
-        public List<Extension> getAdditionalExtensions() {
-            return Collections.singletonList(
-                    new ParameterResolver() {
-                        @Override
-                        public boolean supportsParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            return parameterContext
-                                    .getParameter()
-                                    .getType()
-                                    
.equals(PulsarPartitionSplitReaderBase.class);
-                        }
-
-                        @Override
-                        public Object resolveParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            return splitReader;
-                        }
-                    });
-        }
+    @Test
+    void assignNoSplits() {
+        PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+        assertNull(fetchedMessage(splitReader));
     }
+
+    /** Create a split reader with max message 1, fetch timeout 1s. */
+    protected abstract PulsarPartitionSplitReaderBase<String> splitReader();

Review comment:
       @fapaul  Do you mean use @TestTemplate and a custom 
TestInvocationContextProvider ? Me and @syhily used @TestTemplate and 
TestInvocationContext and then refactored to what is looks like now (cause we 
thought they were an overkill in our case).  But sure I can switch back to use 
the TestInvocationContextProvider for extensibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to