imaffe commented on a change in pull request #17937:
URL: https://github.com/apache/flink/pull/17937#discussion_r763963921
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -126,63 +196,116 @@ void
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader)
"Couldn't poll message from Pulsar.");
}
- /** Create a split reader with max message 1, fetch timeout 1s. */
- protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
+ @Test
+ void consumeMessageCreatedAfterHandleSplitChangesAndFetch() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ operator().sendMessage(topicNameWithPartition(topicName, 0), STRING,
randomAlphabetic(10));
+ fetchedMessages(splitReader, 1, true);
+ }
- /** JUnit5 extension for all the TestTemplate methods in this class. */
- public class PulsarSplitReaderInvocationContextProvider
- implements TestTemplateInvocationContextProvider {
+ @Test
+ void consumeMessageCreatedBeforeHandleSplitsChanges() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- @Override
- public boolean supportsTestTemplate(ExtensionContext context) {
- return true;
- }
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.earliest);
+ fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+ }
- @Override
- public Stream<TestTemplateInvocationContext>
provideTestTemplateInvocationContexts(
- ExtensionContext context) {
- return Stream.of(new
PulsarSplitReaderInvocationContext(splitReader()));
- }
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.latest);
+ fetchedMessages(splitReader, 0, true);
}
- /** Parameter resolver for Split Reader. */
- public static class PulsarSplitReaderInvocationContext
- implements TestTemplateInvocationContext {
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor()
{
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ MessageIdImpl lastMessageId =
+ (MessageIdImpl)
+ sneakyAdmin(
+ () ->
+ operator()
+ .admin()
+ .topics()
+ .getLastMessageId(
+
topicNameWithPartition(topicName, 0)));
+ // when doing seek directly on consumer, by default it includes the
specified messageId
+ seekStartPositionAndHandleSplit(
+ splitReader,
+ topicName,
+ 0,
+ new MessageIdImpl(
+ lastMessageId.getLedgerId(),
+ lastMessageId.getEntryId() - 1,
+ lastMessageId.getPartitionIndex()));
+ fetchedMessages(splitReader, 2, true);
+ }
- private final PulsarPartitionSplitReaderBase<?> splitReader;
+ @Test
+ void emptyTopic() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().createTopic(topicName, DEFAULT_PARTITIONS);
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- public
PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase<?>
splitReader) {
- this.splitReader = checkNotNull(splitReader);
- }
+ @Test
+ void emptyTopicWithoutSeek() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().createTopic(topicName, DEFAULT_PARTITIONS);
+ handleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- @Override
- public String getDisplayName(int invocationIndex) {
- return splitReader.getClass().getSimpleName();
+ @Test
+ void wakeupSplitReaderShouldNotCauseException() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ handleSplit(splitReader, "non-exist", 0);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ splitReader.fetch();
+ } catch (Throwable e) {
+ error.set(e);
+ }
+ },
+ "testWakeUp-thread");
+ t.start();
+ long deadline = System.currentTimeMillis() + 5000L;
+ while (t.isAlive() && System.currentTimeMillis() < deadline) {
+ splitReader.wakeUp();
+ sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
}
+ assertNull(error.get());
+ }
- @Override
- public List<Extension> getAdditionalExtensions() {
- return Collections.singletonList(
- new ParameterResolver() {
- @Override
- public boolean supportsParameter(
- ParameterContext parameterContext,
- ExtensionContext extensionContext)
- throws ParameterResolutionException {
- return parameterContext
- .getParameter()
- .getType()
-
.equals(PulsarPartitionSplitReaderBase.class);
- }
-
- @Override
- public Object resolveParameter(
- ParameterContext parameterContext,
- ExtensionContext extensionContext)
- throws ParameterResolutionException {
- return splitReader;
- }
- });
- }
+ @Test
+ void assignNoSplits() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ assertNull(fetchedMessage(splitReader));
}
+
+ /** Create a split reader with max message 1, fetch timeout 1s. */
+ protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
Review comment:
@fapaul Do you mean use @TestTemplate and a custom
TestInvocationContextProvider ? Me and @syhily used @TestTemplate and
TestInvocationContext and then refactored to what is looks like now (cause we
thought they were an overkill in our case). But sure I can switch back to use
the TestInvocationContextProvider for extensibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]