This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3fe1de954bf3ee05247cdff0c43d71dc9535803d Author: Yufei Zhang <[email protected]> AuthorDate: Tue Dec 7 20:41:06 2021 +0800 [FLINK-25044][pulsar][test]: improve unit test for pulsar source --- .../pulsar/source/PulsarSourceBuilder.java | 2 +- .../reader/source/PulsarOrderedSourceReader.java | 3 +- .../reader/source/PulsarUnorderedSourceReader.java | 3 +- .../pulsar/source/PulsarSourceBuilderTest.java | 102 ++++- .../enumerator/PulsarSourceEnumeratorTest.java | 439 +++++++++++++++++---- .../source/PulsarOrderedSourceReaderTest.java | 191 +++++++++ .../reader/source/PulsarSourceReaderTestBase.java | 245 ++++++++++++ .../PulsarUnorderedSourceReaderTest.java} | 20 +- .../PulsarOrderedPartitionSplitReaderTest.java | 74 +++- .../split/PulsarPartitionSplitReaderTestBase.java | 280 +++++++++++-- .../PulsarUnorderedPartitionSplitReaderTest.java | 16 +- .../pulsar/testutils/PulsarTestCommonUtils.java | 72 ++++ .../extension/SubType.java} | 27 +- .../extension/TestOrderlinessExtension.java | 65 +++ .../testutils/runtime/PulsarRuntimeOperator.java | 10 +- 15 files changed, 1373 insertions(+), 176 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index e539a0c..dd7f41e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -426,7 +426,7 @@ public final class PulsarSourceBuilder<OUT> { if (rangeGenerator == null) { LOG.warn( "No range generator provided for key_shared subscription," - + " we would use the DivideRangeGenerator as the default range generator."); + + " we would use the UniformRangeGenerator as the default range generator."); this.rangeGenerator = new UniformRangeGenerator(); } } else { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java index 9b5c743..db62eb3 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.pulsar.source.reader.source; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; @@ -61,7 +62,7 @@ import java.util.function.Supplier; public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT> { private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedSourceReader.class); - private final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit; + @VisibleForTesting final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit; private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits; private final AtomicReference<Throwable> cursorCommitThrowable = new AtomicReference<>(); private ScheduledExecutorService cursorScheduler; diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java index 8d18b1d..ce57a00 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.pulsar.source.reader.source; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -56,7 +57,7 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedSourceReader.class); @Nullable private final TransactionCoordinatorClient coordinatorClient; - private final SortedMap<Long, List<TxnID>> transactionsToCommit; + @VisibleForTesting final SortedMap<Long, List<TxnID>> transactionsToCommit; private final List<TxnID> transactionsOfFinishedSplits; public PulsarUnorderedSourceReader( diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java index 139b546..b96173d 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java @@ -18,12 +18,16 @@ package org.apache.flink.connector.pulsar.source; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.UniformRangeGenerator; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link PulsarSourceBuilder}. */ @SuppressWarnings("java:S5778") @@ -32,22 +36,22 @@ class PulsarSourceBuilderTest { @Test void someSetterMethodCouldOnlyBeCalledOnce() { PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); - assertThrows( - IllegalArgumentException.class, - () -> builder.setAdminUrl("admin-url").setAdminUrl("admin-url2")); - assertThrows( - IllegalArgumentException.class, - () -> builder.setServiceUrl("service-url").setServiceUrl("service-url2")); - assertThrows( - IllegalArgumentException.class, - () -> - builder.setSubscriptionName("set_subscription_name") - .setSubscriptionName("set_subscription_name2")); - assertThrows( - IllegalArgumentException.class, - () -> - builder.setSubscriptionType(SubscriptionType.Exclusive) - .setSubscriptionType(SubscriptionType.Shared)); + assertThatThrownBy(() -> builder.setAdminUrl("admin-url").setAdminUrl("admin-url2")) + .isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy(() -> builder.setServiceUrl("service-url").setServiceUrl("service-url2")) + .isInstanceOf(IllegalArgumentException.class); + + assertThatThrownBy( + () -> + builder.setSubscriptionName("set_subscription_name") + .setSubscriptionName("set_subscription_name2")) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + builder.setSubscriptionType(SubscriptionType.Exclusive) + .setSubscriptionType(SubscriptionType.Shared)) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -55,7 +59,8 @@ class PulsarSourceBuilderTest { PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); builder.setTopics("a", "b", "c"); - assertThrows(IllegalStateException.class, () -> builder.setTopicPattern("a-a-a")); + assertThatThrownBy(() -> builder.setTopicPattern("a-a-a")) + .isInstanceOf(IllegalStateException.class); } @Test @@ -63,8 +68,63 @@ class PulsarSourceBuilderTest { PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); builder.setSubscriptionType(SubscriptionType.Shared); - assertThrows( - IllegalArgumentException.class, - () -> builder.setRangeGenerator(new UniformRangeGenerator())); + assertThatThrownBy(() -> builder.setRangeGenerator(new UniformRangeGenerator())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void missingRequiredField() { + PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); + assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class); + builder.setAdminUrl("admin-url"); + assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class); + builder.setServiceUrl("service-url"); + assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class); + builder.setSubscriptionName("subscription-name"); + assertThatThrownBy(builder::build).isInstanceOf(NullPointerException.class); + builder.setTopics("topic"); + assertThatThrownBy(builder::build).isInstanceOf(NullPointerException.class); + builder.setDeserializationSchema(pulsarSchema(Schema.STRING)); + assertThatCode(builder::build).doesNotThrowAnyException(); + } + + @Test + void defaultBuilder() { + PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); + assertThatThrownBy(builder::build).isInstanceOf(IllegalArgumentException.class); + } + + @Test + void subscriptionTypeShouldNotBeOverriddenBySetMethod() { + PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); + fillRequiredFields(builder); + + Configuration config = new Configuration(); + config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared); + builder.setConfig(config); + + assertThatThrownBy(() -> builder.setSubscriptionType(SubscriptionType.Failover)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void subscriptionTypeShouldNotBeOverriddenByConfiguration() { + PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>(); + fillRequiredFields(builder); + + builder.setSubscriptionType(SubscriptionType.Failover); + + Configuration config = new Configuration(); + config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared); + assertThatThrownBy(() -> builder.setConfig(config)) + .isInstanceOf(IllegalArgumentException.class); + } + + private void fillRequiredFields(PulsarSourceBuilder<String> builder) { + builder.setAdminUrl("admin-url"); + builder.setServiceUrl("service-url"); + builder.setSubscriptionName("subscription-name"); + builder.setTopics("topic"); + builder.setDeserializationSchema(pulsarSchema(Schema.STRING)); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java index a4cc0a4..c52c514 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java @@ -18,117 +18,326 @@ package org.apache.flink.connector.pulsar.source.enumerator; +import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; -import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest; import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link PulsarSourceEnumerator}. */ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { private static final int NUM_SUBTASKS = 3; - private static final String DYNAMIC_TOPIC_NAME = "dynamic_topic"; - private static final String TOPIC1 = "topic"; - private static final String TOPIC2 = "pattern-topic"; - private static final Set<String> PRE_EXISTING_TOPICS = Sets.newHashSet(TOPIC1, TOPIC2); + private static final int READER0 = 0; + private static final int READER1 = 1; + private static final int PARTITION_DISCOVERY_CALLABLE_INDEX = 0; private static final boolean ENABLE_PERIODIC_PARTITION_DISCOVERY = true; private static final boolean DISABLE_PERIODIC_PARTITION_DISCOVERY = false; - private static final boolean INCLUDE_DYNAMIC_TOPIC = true; - private static final boolean EXCLUDE_DYNAMIC_TOPIC = false; - // @TestInstance(TestInstance.Lifecycle.PER_CLASS) is annotated in PulsarTestSuitBase, so this - // method could be non-static. - @BeforeAll - void beforeAll() { - operator().setupTopic(TOPIC1); - operator().setupTopic(TOPIC2); + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) throws Exception { + Set<String> prexistingTopics = setupPreexistingTopics(); + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator( + subscriptionType, + prexistingTopics, + context, + DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + + // Start the enumerator and it should schedule a one time task to discover and assign + // partitions. + enumerator.start(); + assertThat(context.getPeriodicCallables()).isEmpty(); + assertThat(context.getOneTimeCallables()) + .as("A one time partition discovery callable should have been scheduled") + .hasSize(1); + } } - @AfterAll - void afterAll() { - operator().deleteTopic(TOPIC1, true); - operator().deleteTopic(TOPIC2, true); + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void startWithPeriodicPartitionDiscovery(SubscriptionType subscriptionType) throws Exception { + Set<String> prexistingTopics = setupPreexistingTopics(); + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator( + subscriptionType, + prexistingTopics, + context, + ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + + enumerator.start(); + assertThat(context.getOneTimeCallables()).isEmpty(); + assertThat((context.getPeriodicCallables())) + .as("A periodic partition discovery callable should have been scheduled") + .hasSize(1); + } } - @Test - void startWithDiscoverPartitionsOnce() throws Exception { + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void discoverPartitionsTriggersAssignments(SubscriptionType subscriptionType) throws Throwable { + Set<String> prexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = - createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + createEnumerator( + subscriptionType, + prexistingTopics, + context, + DISABLE_PERIODIC_PARTITION_DISCOVERY)) { - // Start the enumerator and it should schedule a one time task to discover and assign - // partitions. enumerator.start(); - assertTrue(context.getPeriodicCallables().isEmpty()); - assertEquals( - 1, - context.getOneTimeCallables().size(), - "A one time partition discovery callable should have been scheduled"); + + // register reader 0, 1 + registerReader(context, enumerator, READER0); + registerReader(context, enumerator, READER1); + assertThat(context.getSplitsAssignmentSequence()).isEmpty(); + + // Run the partition discover callable and check the partition assignment. + runOneTimePartitionDiscovery(context); + verifyLastReadersAssignments(subscriptionType, context, prexistingTopics, 1); } } - @Test - void startWithPeriodicPartitionDiscovery() throws Exception { + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void discoverPartitionsPeriodically(SubscriptionType subscriptionType) throws Throwable { + String dynamicTopic = randomAlphabetic(10); + Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> topicsToSubscribe = new HashSet<>(prexistingTopics); + topicsToSubscribe.add(dynamicTopic); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = - createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + createEnumerator( + subscriptionType, + topicsToSubscribe, + context, + ENABLE_PERIODIC_PARTITION_DISCOVERY)) { - // Start the enumerator and it should schedule a one time task to discover and assign - // partitions. + testRegisterReadersForPreexistingTopics( + subscriptionType, prexistingTopics, context, enumerator); + + // invoke partition discovery callable again and there should be no new assignments. + runPeriodicPartitionDiscovery(context); + + int expectedSplitsAssignmentSequenceSize = + subscriptionType == SubscriptionType.Failover ? 1 : 2; + + assertThat(context.getSplitsAssignmentSequence()) + .as("No new assignments should be made because there is no partition change") + .hasSize(expectedSplitsAssignmentSequenceSize); + + // create the dynamic topic. + operator().createTopic(dynamicTopic, PulsarRuntimeOperator.DEFAULT_PARTITIONS); + + // invoke partition discovery callable again. + while (true) { + runPeriodicPartitionDiscovery(context); + if (context.getSplitsAssignmentSequence().size() < 2) { + sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } else { + break; + } + } + verifyLastReadersAssignments( + subscriptionType, + context, + Collections.singleton(dynamicTopic), + expectedSplitsAssignmentSequenceSize + 1); + } + } + + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void addSplitsBack(SubscriptionType subscriptionType) throws Throwable { + Set<String> prexistingTopics = setupPreexistingTopics(); + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator( + subscriptionType, + prexistingTopics, + context, + ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + + testRegisterReadersForPreexistingTopics( + subscriptionType, prexistingTopics, context, enumerator); + + // Simulate a reader failure. + context.unregisterReader(READER0); + enumerator.addSplitsBack( + context.getSplitsAssignmentSequence().get(0).assignment().get(READER0), + READER0); + int expectedSplitsAssignmentSequenceSize = + subscriptionType == SubscriptionType.Failover ? 1 : 2; + assertThat(context.getSplitsAssignmentSequence()) + .as("The added back splits should have not been assigned") + .hasSize(expectedSplitsAssignmentSequenceSize); + + // Simulate a reader recovery. + registerReader(context, enumerator, READER0); + verifyLastReadersAssignments( + subscriptionType, + context, + prexistingTopics, + expectedSplitsAssignmentSequenceSize + 1); + } + } + + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover"}) + void workWithPreexistingAssignments(SubscriptionType subscriptionType) throws Throwable { + Set<String> prexistingTopics = setupPreexistingTopics(); + PulsarSourceEnumState preexistingAssignments; + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context1 = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator( + subscriptionType, + prexistingTopics, + context1, + ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + testRegisterReadersForPreexistingTopics( + subscriptionType, prexistingTopics, context1, enumerator); + preexistingAssignments = + asEnumState(context1.getSplitsAssignmentSequence().get(0).assignment()); + } + + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context2 = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator( + subscriptionType, + prexistingTopics, + context2, + ENABLE_PERIODIC_PARTITION_DISCOVERY, + preexistingAssignments)) { enumerator.start(); - assertTrue(context.getOneTimeCallables().isEmpty()); - assertEquals( - 1, - context.getPeriodicCallables().size(), - "A periodic partition discovery callable should have been scheduled"); + runPeriodicPartitionDiscovery(context2); + + registerReader(context2, enumerator, READER0); + verifyLastReadersAssignments(subscriptionType, context2, prexistingTopics, 1); } } - private PulsarSourceEnumerator createEnumerator( + @ParameterizedTest + @EnumSource( + value = SubscriptionType.class, + names = {"Failover", "Shared"}) + void snapshotState(SubscriptionType subscriptionType) throws Throwable { + Set<String> prexistingTopics = setupPreexistingTopics(); + try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + PulsarSourceEnumerator enumerator = + createEnumerator(subscriptionType, prexistingTopics, context, false)) { + enumerator.start(); + + // No reader is registered, so the state should be empty + final PulsarSourceEnumState state1 = enumerator.snapshotState(1L); + assertThat(state1.getAppendedPartitions()).isEmpty(); + + registerReader(context, enumerator, READER0); + registerReader(context, enumerator, READER1); + runOneTimePartitionDiscovery(context); + + // The state should contain splits assigned to READER0 and READER1 + final PulsarSourceEnumState state2 = enumerator.snapshotState(1L); + verifySplitAssignmentWithPartitions( + getExpectedTopicPartitions(prexistingTopics), state2.getAppendedPartitions()); + } + } + + private Set<String> setupPreexistingTopics() { + String topic1 = randomAlphabetic(10); + String topic2 = randomAlphabetic(10); + operator().setupTopic(topic1); + operator().setupTopic(topic2); + Set<String> preexistingTopics = new HashSet<>(); + preexistingTopics.add(topic1); + preexistingTopics.add(topic2); + return preexistingTopics; + } + + private void testRegisterReadersForPreexistingTopics( + SubscriptionType subscriptionType, + Set<String> topics, MockSplitEnumeratorContext<PulsarPartitionSplit> context, - boolean enablePeriodicPartitionDiscovery) { - return createEnumerator(context, enablePeriodicPartitionDiscovery, EXCLUDE_DYNAMIC_TOPIC); + PulsarSourceEnumerator enumerator) + throws Throwable { + enumerator.start(); + + // register reader 0 before the partition discovery. + registerReader(context, enumerator, READER0); + assertThat(context.getSplitsAssignmentSequence()).isEmpty(); + + // Run the partition discover callable and check the partition assignment. + runPeriodicPartitionDiscovery(context); + verifyLastReadersAssignments(subscriptionType, context, topics, 1); + + registerReader(context, enumerator, READER1); + + int expectedSplitsAssignmentSequenceSize = + subscriptionType == SubscriptionType.Failover ? 1 : 2; + verifyLastReadersAssignments( + subscriptionType, context, topics, expectedSplitsAssignmentSequenceSize); } private PulsarSourceEnumerator createEnumerator( + SubscriptionType subscriptionType, + Set<String> topics, MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext, - boolean enablePeriodicPartitionDiscovery, - boolean includeDynamicTopic) { - List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS); - if (includeDynamicTopic) { - topics.add(DYNAMIC_TOPIC_NAME); - } - Configuration configuration = operator().config(); - configuration.set(PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Failover); - + boolean enablePeriodicPartitionDiscovery) { PulsarSourceEnumState sourceEnumState = new PulsarSourceEnumState( Sets.newHashSet(), @@ -136,40 +345,38 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { Maps.newHashMap(), Maps.newHashMap(), false); - return createEnumerator( + subscriptionType, + topics, enumContext, enablePeriodicPartitionDiscovery, - topics, - sourceEnumState, - configuration); + sourceEnumState); } - /** - * Create the enumerator. For the purpose of the tests in this class we don't care about the - * subscriber and offsets initializer, so just use arbitrary settings. - */ private PulsarSourceEnumerator createEnumerator( + SubscriptionType subscriptionType, + Set<String> topicsToSubscribe, MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext, boolean enablePeriodicPartitionDiscovery, - Collection<String> topicsToSubscribe, - PulsarSourceEnumState sourceEnumState, - Configuration configuration) { + PulsarSourceEnumState sourceEnumState) { // Use a TopicPatternSubscriber so that no exception if a subscribed topic hasn't been // created yet. String topicRegex = String.join("|", topicsToSubscribe); Pattern topicPattern = Pattern.compile(topicRegex); PulsarSubscriber subscriber = getTopicPatternSubscriber(topicPattern, RegexSubscriptionMode.AllTopics); + + Configuration configuration = operator().config(); + configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); if (enablePeriodicPartitionDiscovery) { configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L); } else { configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L); } SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); + SplitsAssignmentState assignmentState = new SplitsAssignmentState(latest(), sourceConfiguration, sourceEnumState); - return new PulsarSourceEnumerator( subscriber, StartCursor.earliest(), @@ -179,4 +386,102 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { enumContext, assignmentState); } + + private void registerReader( + MockSplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumerator enumerator, + int reader) { + context.registerReader(new ReaderInfo(reader, "testing location ")); + enumerator.addReader(reader); + } + + private void verifyLastReadersAssignments( + SubscriptionType subscriptionType, + MockSplitEnumeratorContext<PulsarPartitionSplit> context, + Set<String> topics, + int expectedAssignmentSeqSize) { + assertThat(context.getSplitsAssignmentSequence()).hasSize(expectedAssignmentSeqSize); + verifyAssignments( + subscriptionType, + getExpectedTopicPartitions(topics), + context.getSplitsAssignmentSequence() + .get(expectedAssignmentSeqSize - 1) + .assignment()); + } + + private void verifyAssignments( + SubscriptionType subscriptionType, + Set<TopicPartition> expectedTopicPartitions, + Map<Integer, List<PulsarPartitionSplit>> actualAssignments) { + if (subscriptionType == SubscriptionType.Failover) { + int actualSize = actualAssignments.values().stream().mapToInt(List::size).sum(); + assertThat(actualSize).isEqualTo(expectedTopicPartitions.size()); + } else if (subscriptionType == SubscriptionType.Shared) { + actualAssignments + .values() + .forEach( + (splits) -> assertThat(splits).hasSize(expectedTopicPartitions.size())); + } + } + + private Set<TopicPartition> getExpectedTopicPartitions(Set<String> topics) { + Set<TopicPartition> allPartitions = new HashSet<>(); + for (String topicName : topics) { + for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) { + allPartitions.add(new TopicPartition(topicName, i, TopicRange.createFullRange())); + } + } + return allPartitions; + } + + private void verifySplitAssignmentWithPartitions( + Set<TopicPartition> expectedAssignment, Set<TopicPartition> actualTopicPartitions) { + assertThat(actualTopicPartitions).isEqualTo(expectedAssignment); + } + + // this method only works for non Shared Mode + private PulsarSourceEnumState asEnumState( + Map<Integer, List<PulsarPartitionSplit>> assignments) { + Set<TopicPartition> appendedPartitions = new HashSet<>(); + Set<PulsarPartitionSplit> pendingPartitionSplits = new HashSet<>(); + Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits = new HashMap<>(); + Map<Integer, Set<String>> readerAssignedSplits = new HashMap<>(); + boolean initialized = false; + + assignments + .values() + .forEach( + splits -> { + appendedPartitions.addAll( + splits.stream() + .map(PulsarPartitionSplit::getPartition) + .collect(Collectors.toList())); + pendingPartitionSplits.addAll(splits); + }); + + return new PulsarSourceEnumState( + appendedPartitions, + pendingPartitionSplits, + sharedPendingPartitionSplits, + readerAssignedSplits, + initialized); + } + + private void runOneTimePartitionDiscovery( + MockSplitEnumeratorContext<PulsarPartitionSplit> context) throws Throwable { + // Fetch potential topic descriptions + context.runNextOneTimeCallable(); + if (!context.getOneTimeCallables().isEmpty()) { + context.runNextOneTimeCallable(); + } + } + + private void runPeriodicPartitionDiscovery( + MockSplitEnumeratorContext<PulsarPartitionSplit> context) throws Throwable { + // Fetch potential topic descriptions + context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX); + if (!context.getOneTimeCallables().isEmpty()) { + context.runNextOneTimeCallable(); + } + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java new file mode 100644 index 0000000..477dfdd --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.testutils.extension.SubType; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.junit.jupiter.api.TestTemplate; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit; +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION; +import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase { + + private static final int MAX_EMPTY_POLLING_TIMES = 10; + + @SubType SubscriptionType subscriptionType = SubscriptionType.Failover; + + @TestTemplate + void consumeMessagesAndCommitOffsets( + PulsarSourceReaderBase<Integer> baseReader, Boundedness boundedness, String topicName) + throws Exception { + // set up the partition + PulsarOrderedSourceReader<Integer> reader = (PulsarOrderedSourceReader<Integer>) baseReader; + setupSourceReader(reader, topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED); + + // waiting for results + TestingReaderOutput<Integer> output = new TestingReaderOutput<>(); + pollUntil( + reader, + output, + () -> output.getEmittedRecords().size() == NUM_RECORDS_PER_PARTITION, + "The output didn't poll enough records before timeout."); + reader.snapshotState(100L); + reader.notifyCheckpointComplete(100L); + pollUntil( + reader, + output, + reader.cursorsToCommit::isEmpty, + "The offset commit did not finish before timeout."); + + // verify consumption + reader.close(); + verifyAllMessageAcknowledged( + NUM_RECORDS_PER_PARTITION, TopicNameUtils.topicNameWithPartition(topicName, 0)); + } + + @TestTemplate + void offsetCommitOnCheckpointComplete( + PulsarSourceReaderBase<Integer> baseReader, Boundedness boundedness, String topicName) + throws Exception { + PulsarOrderedSourceReader<Integer> reader = (PulsarOrderedSourceReader<Integer>) baseReader; + // consume more than 1 partition + reader.addSplits( + createPartitionSplits( + topicName, DEFAULT_PARTITIONS, Boundedness.CONTINUOUS_UNBOUNDED)); + reader.notifyNoMoreSplits(); + TestingReaderOutput<Integer> output = new TestingReaderOutput<>(); + long checkpointId = 0; + int emptyResultTime = 0; + InputStatus status; + do { + checkpointId++; + status = reader.pollNext(output); + // Create a checkpoint for each message consumption, but not complete them. + reader.snapshotState(checkpointId); + // the first couple of pollNext() might return NOTHING_AVAILABLE before data appears + if (InputStatus.NOTHING_AVAILABLE == status) { + emptyResultTime++; + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + } while (emptyResultTime < MAX_EMPTY_POLLING_TIMES + && status != InputStatus.END_OF_INPUT + && output.getEmittedRecords().size() + < NUM_RECORDS_PER_PARTITION * DEFAULT_PARTITIONS); + + // The completion of the last checkpoint should subsume all previous checkpoints. + assertThat(reader.cursorsToCommit).hasSize((int) checkpointId); + long lastCheckpointId = checkpointId; + // notify checkpoint complete and expect all cursors committed + assertThatCode(() -> reader.notifyCheckpointComplete(lastCheckpointId)) + .doesNotThrowAnyException(); + assertThat(reader.cursorsToCommit).isEmpty(); + + // Verify the committed offsets. + reader.close(); + for (int i = 0; i < DEFAULT_PARTITIONS; i++) { + verifyAllMessageAcknowledged( + NUM_RECORDS_PER_PARTITION, TopicNameUtils.topicNameWithPartition(topicName, i)); + } + } + + private void setupSourceReader( + PulsarSourceReaderBase<Integer> reader, + String topicName, + int partitionId, + Boundedness boundedness) { + PulsarPartitionSplit split = createPartitionSplit(topicName, partitionId, boundedness); + reader.addSplits(Collections.singletonList(split)); + reader.notifyNoMoreSplits(); + } + + private void pollUntilReadExpectedNumberOfRecordsAndValidate( + PulsarSourceReaderBase<Integer> reader, + TestingReaderOutput<Integer> output, + int expectedRecords, + String topicNameWithPartition) + throws Exception { + pollUntil( + reader, + output, + () -> output.getEmittedRecords().size() == expectedRecords, + "The output didn't poll enough records before timeout."); + reader.close(); + verifyAllMessageAcknowledged(expectedRecords, topicNameWithPartition); + assertThat(output.getEmittedRecords()).hasSize(expectedRecords); + } + + private void pollUntil( + PulsarSourceReaderBase<Integer> reader, + ReaderOutput<Integer> output, + Supplier<Boolean> condition, + String errorMessage) + throws InterruptedException, TimeoutException { + CommonTestUtils.waitUtil( + () -> { + try { + reader.pollNext(output); + } catch (Exception exception) { + throw new RuntimeException( + "Caught unexpected exception when polling from the reader", + exception); + } + return condition.get(); + }, + Duration.ofSeconds(Integer.MAX_VALUE), + errorMessage); + } + + private void verifyAllMessageAcknowledged(int expectedMessages, String partitionName) + throws PulsarAdminException { + TopicStats topicStats = operator().admin().topics().getStats(partitionName, true, true); + // verify if the messages has been consumed + Map<String, ? extends SubscriptionStats> subscriptionStats = topicStats.getSubscriptions(); + assertThat(subscriptionStats).hasSizeGreaterThan(0); + subscriptionStats.forEach( + (subscription, stats) -> { + assertThat(stats.getUnackedMessages()).isZero(); + assertThat(stats.getMsgOutCounter()).isEqualTo(expectedMessages); + }); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java new file mode 100644 index 0000000..339027c --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.source; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; +import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; +import org.apache.flink.core.io.InputStatus; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.Extension; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.extension.TestTemplateInvocationContext; +import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit; +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext; +import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY; +import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +@ExtendWith({ + TestOrderlinessExtension.class, + TestLoggerExtension.class, +}) +abstract class PulsarSourceReaderTestBase extends PulsarTestSuiteBase { + + @RegisterExtension + PulsarSourceReaderInvocationContextProvider provider = + new PulsarSourceReaderInvocationContextProvider(); + + @BeforeEach + void beforeEach(String topicName) { + Random random = new Random(System.currentTimeMillis()); + operator().setupTopic(topicName, Schema.INT32, () -> random.nextInt(20)); + } + + @AfterEach + void afterEach(String topicName) { + operator().deleteTopic(topicName, true); + } + + @TestTemplate + void assignZeroSplitsCreatesZeroSubscription( + PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName) + throws Exception { + reader.snapshotState(100L); + reader.notifyCheckpointComplete(100L); + // Verify the committed offsets. + reader.close(); + for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) { + verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName, i)); + } + } + + @TestTemplate + void assigningEmptySplits( + PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName) + throws Exception { + final PulsarPartitionSplit emptySplit = + createPartitionSplit( + topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED, MessageId.latest); + + reader.addSplits(Collections.singletonList(emptySplit)); + + TestingReaderOutput<Integer> output = new TestingReaderOutput<>(); + InputStatus status = reader.pollNext(output); + assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE); + reader.close(); + } + + private void verifyNoSubscriptionCreated(String partitionName) throws PulsarAdminException { + Map<String, ? extends SubscriptionStats> subscriptionStats = + operator().admin().topics().getStats(partitionName, true, true).getSubscriptions(); + assertThat(subscriptionStats).isEmpty(); + } + + private PulsarSourceReaderBase<Integer> sourceReader( + boolean autoAcknowledgementEnabled, SubscriptionType subscriptionType) { + Configuration configuration = operator().config(); + configuration.set(PULSAR_MAX_FETCH_RECORDS, 1); + configuration.set(PULSAR_MAX_FETCH_TIME, 1000L); + configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); + configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); + if (autoAcknowledgementEnabled + || configuration.get(PULSAR_SUBSCRIPTION_TYPE) == SubscriptionType.Shared) { + configuration.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true); + } + PulsarDeserializationSchema<Integer> deserializationSchema = pulsarSchema(Schema.INT32); + SourceReaderContext context = new TestingReaderContext(); + try { + deserializationSchema.open( + new PulsarDeserializationSchemaInitializationContext(context)); + } catch (Exception e) { + fail("Error while opening deserializationSchema"); + } + + SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); + return (PulsarSourceReaderBase<Integer>) + PulsarSourceReaderFactory.create( + context, deserializationSchema, configuration, sourceConfiguration); + } + + public class PulsarSourceReaderInvocationContextProvider + implements TestTemplateInvocationContextProvider { + + @Override + public boolean supportsTestTemplate(ExtensionContext context) { + return true; + } + + @Override + public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts( + ExtensionContext context) { + SubscriptionType subscriptionType = + (SubscriptionType) + context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE) + .get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY); + return Stream.of( + new PulsarSourceReaderInvocationContext( + sourceReader(true, subscriptionType), Boundedness.CONTINUOUS_UNBOUNDED), + new PulsarSourceReaderInvocationContext( + sourceReader(false, subscriptionType), + Boundedness.CONTINUOUS_UNBOUNDED)); + } + } + + public static class PulsarSourceReaderInvocationContext + implements TestTemplateInvocationContext { + + private final PulsarSourceReaderBase<?> sourceReader; + private final Boundedness boundedness; + private final String randomTopicName; + + public PulsarSourceReaderInvocationContext( + PulsarSourceReaderBase<?> splitReader, Boundedness boundedness) { + this.sourceReader = checkNotNull(splitReader); + this.boundedness = checkNotNull(boundedness); + this.randomTopicName = randomAlphabetic(5); + } + + @Override + public String getDisplayName(int invocationIndex) { + return "AutoAckEnabled: " + + sourceReader.sourceConfiguration.isEnableAutoAcknowledgeMessage() + + " Boundedness: " + + boundedness.toString(); + } + + @Override + public List<Extension> getAdditionalExtensions() { + return Arrays.asList( + new ParameterResolver() { + @Override + public boolean supportsParameter( + ParameterContext parameterContext, + ExtensionContext extensionContext) + throws ParameterResolutionException { + return isAssignableFromParameterContext( + PulsarSourceReaderBase.class, parameterContext) + || isAssignableFromParameterContext( + Boundedness.class, parameterContext) + || isAssignableFromParameterContext( + String.class, parameterContext); + } + + @Override + public Object resolveParameter( + ParameterContext parameterContext, + ExtensionContext extensionContext) + throws ParameterResolutionException { + if (parameterContext + .getParameter() + .getType() + .equals(PulsarSourceReaderBase.class)) { + return sourceReader; + } else if (parameterContext + .getParameter() + .getType() + .equals(Boundedness.class)) { + return boundedness; + } else { + return randomTopicName; + } + } + }); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java similarity index 52% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java copy to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java index a94504a..4f7fdd3 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java @@ -16,22 +16,12 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.source.reader.split; +package org.apache.flink.connector.pulsar.source.reader.source; -import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.pulsar.testutils.extension.SubType; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema; +import org.apache.pulsar.client.api.SubscriptionType; -/** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */ -class PulsarOrderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase { - - @Override - protected PulsarPartitionSplitReaderBase<String> splitReader() { - return new PulsarOrderedPartitionSplitReader<>( - operator().client(), - operator().admin(), - readerConfig(), - sourceConfig(), - flinkSchema(new SimpleStringSchema())); - } +class PulsarUnorderedSourceReaderTest extends PulsarSourceReaderTestBase { + @SubType SubscriptionType subscriptionType = SubscriptionType.Shared; } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java index a94504a..3d58d5e 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java @@ -18,20 +18,74 @@ package org.apache.flink.connector.pulsar.source.reader.split; -import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.pulsar.testutils.extension.SubType; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.junit.jupiter.api.TestTemplate; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION; +import static org.apache.pulsar.client.api.Schema.STRING; /** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */ class PulsarOrderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase { - @Override - protected PulsarPartitionSplitReaderBase<String> splitReader() { - return new PulsarOrderedPartitionSplitReader<>( - operator().client(), - operator().admin(), - readerConfig(), - sourceConfig(), - flinkSchema(new SimpleStringSchema())); + @SubType SubscriptionType subscriptionType = SubscriptionType.Failover; + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + handleSplit(splitReader, topicName, 0); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + handleSplit(splitReader, topicName, 0, MessageId.latest); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + handleSplit(splitReader, topicName, 0, MessageId.earliest); + fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithoutSeek( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + MessageIdImpl lastMessageId = + (MessageIdImpl) + sneakyAdmin( + () -> + operator() + .admin() + .topics() + .getLastMessageId( + topicNameWithPartition(topicName, 0))); + // when recover, use exclusive startCursor + handleSplit( + splitReader, + topicName, + 0, + new MessageIdImpl( + lastMessageId.getLedgerId(), + lastMessageId.getEntryId() - 1, + lastMessageId.getPartitionIndex())); + fetchedMessages(splitReader, 1, true); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java index fb9fa01..0c0c0ba 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java @@ -18,17 +18,28 @@ package org.apache.flink.connector.pulsar.source.reader.split; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; +import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension; +import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; -import org.junit.jupiter.api.DisplayName; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ParameterContext; @@ -39,78 +50,164 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static java.time.Duration.ofSeconds; import static java.util.Collections.singletonList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; +import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.never; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema; +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext; +import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY; +import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; +import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.pulsar.client.api.Schema.STRING; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.assertj.core.api.Assertions.assertThat; /** Test utils for split readers. */ +@ExtendWith({ + TestOrderlinessExtension.class, + TestLoggerExtension.class, +}) public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase { @RegisterExtension PulsarSplitReaderInvocationContextProvider provider = new PulsarSplitReaderInvocationContextProvider(); - protected Configuration readerConfig() { + /** Default reader config: max message 1, fetch timeout 1s. */ + private Configuration readerConfig() { Configuration config = operator().config(); config.set(PULSAR_MAX_FETCH_RECORDS, 1); config.set(PULSAR_MAX_FETCH_TIME, 1000L); config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true); - return config; } - protected SourceConfiguration sourceConfig() { + private SourceConfiguration sourceConfig() { return new SourceConfiguration(readerConfig()); } - protected SplitsAddition<PulsarPartitionSplit> createSplit(String topicName, int partitionId) { + protected void handleSplit( + PulsarPartitionSplitReaderBase<String> reader, String topicName, int partitionId) { + handleSplit(reader, topicName, partitionId, null); + } + + protected void handleSplit( + PulsarPartitionSplitReaderBase<String> reader, + String topicName, + int partitionId, + MessageId startPosition) { + TopicPartition partition = new TopicPartition(topicName, partitionId, createFullRange()); + PulsarPartitionSplit split = + new PulsarPartitionSplit(partition, StopCursor.never(), startPosition, null); + SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split)); + reader.handleSplitsChanges(addition); + } + + private void seekStartPositionAndHandleSplit( + PulsarPartitionSplitReaderBase<String> reader, String topicName, int partitionId) { + seekStartPositionAndHandleSplit(reader, topicName, partitionId, MessageId.latest); + } + + private void seekStartPositionAndHandleSplit( + PulsarPartitionSplitReaderBase<String> reader, + String topicName, + int partitionId, + MessageId startPosition) { TopicPartition partition = new TopicPartition(topicName, partitionId, createFullRange()); - PulsarPartitionSplit split = new PulsarPartitionSplit(partition, never()); - return new SplitsAddition<>(singletonList(split)); + PulsarPartitionSplit split = + new PulsarPartitionSplit(partition, StopCursor.never(), null, null); + SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split)); + + // create consumer and seek before split changes + try (Consumer<byte[]> consumer = reader.createPulsarConsumer(partition)) { + // inclusive messageId + StartCursor startCursor = StartCursor.fromMessageId(startPosition); + startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer); + } catch (PulsarClientException e) { + sneakyThrow(e); + } + + reader.handleSplitsChanges(addition); + } + + private <T> PulsarMessage<T> fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) { + return fetchedMessages(splitReader, 1, false).stream().findFirst().orElse(null); + } + + protected <T> List<PulsarMessage<T>> fetchedMessages( + PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount, boolean verify) { + return fetchedMessages( + splitReader, expectedCount, verify, Boundedness.CONTINUOUS_UNBOUNDED); } - protected <T> PulsarMessage<T> fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) { - try { - RecordsWithSplitIds<PulsarMessage<T>> records = splitReader.fetch(); - if (records.nextSplit() != null) { - return records.nextRecordFromSplit(); + private <T> List<PulsarMessage<T>> fetchedMessages( + PulsarPartitionSplitReaderBase<T> splitReader, + int expectedCount, + boolean verify, + Boundedness boundedness) { + List<PulsarMessage<T>> messages = new ArrayList<>(expectedCount); + List<String> finishedSplits = new ArrayList<>(); + for (int i = 0; i < 3; ) { + try { + RecordsWithSplitIds<PulsarMessage<T>> recordsBySplitIds = splitReader.fetch(); + if (recordsBySplitIds.nextSplit() != null) { + // Collect the records in this split. + PulsarMessage<T> record; + while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) { + messages.add(record); + } + finishedSplits.addAll(recordsBySplitIds.finishedSplits()); + } else { + i++; + } + } catch (IOException e) { + i++; + } + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + if (verify) { + assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount); + if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) { + assertThat(finishedSplits).as("Split should not be marked as finished").hasSize(0); } else { - return null; + assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1); } - } catch (IOException e) { - return null; } + + return messages; } @TestTemplate - @DisplayName("Retrieve message after timeout by using given split reader") void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader) throws InterruptedException, TimeoutException { String topicName = randomAlphabetic(10); // Add a split - splitReader.handleSplitsChanges(createSplit(topicName, 0)); + seekStartPositionAndHandleSplit(splitReader, topicName, 0); // Poll once with a null message PulsarMessage<String> message1 = fetchedMessage(splitReader); - assertNull(message1); + assertThat(message1).isNull(); // Send a message to pulsar String topic = topicNameWithPartition(topicName, 0); @@ -122,14 +219,139 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite PulsarMessage<String> message2 = fetchedMessage(splitReader); return message2 != null; }, - ofSeconds(10), + ofSeconds(Integer.MAX_VALUE), "Couldn't poll message from Pulsar."); } + @TestTemplate + void consumeMessageCreatedAfterHandleSplitChangesAndFetch( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + seekStartPositionAndHandleSplit(splitReader, topicName, 0); + operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, randomAlphabetic(10)); + fetchedMessages(splitReader, 1, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChanges( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + seekStartPositionAndHandleSplit(splitReader, topicName, 0); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.earliest); + fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition( + PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.latest); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor( + PulsarPartitionSplitReaderBase<String> splitReader) { + + String topicName = randomAlphabetic(10); + operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + MessageIdImpl lastMessageId = + (MessageIdImpl) + sneakyAdmin( + () -> + operator() + .admin() + .topics() + .getLastMessageId( + topicNameWithPartition(topicName, 0))); + // when doing seek directly on consumer, by default it includes the specified messageId + seekStartPositionAndHandleSplit( + splitReader, + topicName, + 0, + new MessageIdImpl( + lastMessageId.getLedgerId(), + lastMessageId.getEntryId() - 1, + lastMessageId.getPartitionIndex())); + fetchedMessages(splitReader, 2, true); + } + + @TestTemplate + void emptyTopic(PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().createTopic(topicName, DEFAULT_PARTITIONS); + seekStartPositionAndHandleSplit(splitReader, topicName, 0); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void emptyTopicWithoutSeek(PulsarPartitionSplitReaderBase<String> splitReader) { + String topicName = randomAlphabetic(10); + operator().createTopic(topicName, DEFAULT_PARTITIONS); + handleSplit(splitReader, topicName, 0); + fetchedMessages(splitReader, 0, true); + } + + @TestTemplate + void wakeupSplitReaderShouldNotCauseException( + PulsarPartitionSplitReaderBase<String> splitReader) { + handleSplit(splitReader, "non-exist", 0); + AtomicReference<Throwable> error = new AtomicReference<>(); + Thread t = + new Thread( + () -> { + try { + splitReader.fetch(); + } catch (Throwable e) { + error.set(e); + } + }, + "testWakeUp-thread"); + t.start(); + long deadline = System.currentTimeMillis() + 5000L; + while (t.isAlive() && System.currentTimeMillis() < deadline) { + splitReader.wakeUp(); + sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + } + assertThat(error.get()).isNull(); + } + + @TestTemplate + void assignNoSplits(PulsarPartitionSplitReaderBase<String> splitReader) { + assertThat(fetchedMessage(splitReader)).isNull(); + } + /** Create a split reader with max message 1, fetch timeout 1s. */ - protected abstract PulsarPartitionSplitReaderBase<String> splitReader(); + private PulsarPartitionSplitReaderBase<String> splitReader(SubscriptionType subscriptionType) { + if (subscriptionType == SubscriptionType.Failover) { + return new PulsarOrderedPartitionSplitReader<>( + operator().client(), + operator().admin(), + readerConfig(), + sourceConfig(), + flinkSchema(new SimpleStringSchema())); + } else { + return new PulsarUnorderedPartitionSplitReader<>( + operator().client(), + operator().admin(), + readerConfig(), + sourceConfig(), + flinkSchema(new SimpleStringSchema()), + null); + } + } - /** JUnit5 extension for all the TestTemplate methods in this class. */ + /** Context Provider for PulsarSplitReaderTestBase. */ public class PulsarSplitReaderInvocationContextProvider implements TestTemplateInvocationContextProvider { @@ -141,7 +363,11 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite @Override public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts( ExtensionContext context) { - return Stream.of(new PulsarSplitReaderInvocationContext(splitReader())); + SubscriptionType subscriptionType = + (SubscriptionType) + context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE) + .get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY); + return Stream.of(new PulsarSplitReaderInvocationContext(splitReader(subscriptionType))); } } @@ -169,10 +395,8 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return parameterContext - .getParameter() - .getType() - .equals(PulsarPartitionSplitReaderBase.class); + return isAssignableFromParameterContext( + PulsarPartitionSplitReaderBase.class, parameterContext); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java index 917cacd..2cb3cb9 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java @@ -18,21 +18,11 @@ package org.apache.flink.connector.pulsar.source.reader.split; -import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.pulsar.testutils.extension.SubType; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema; +import org.apache.pulsar.client.api.SubscriptionType; /** Unit tests for {@link PulsarUnorderedPartitionSplitReaderTest}. */ class PulsarUnorderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase { - - @Override - protected PulsarPartitionSplitReaderBase<String> splitReader() { - return new PulsarUnorderedPartitionSplitReader<>( - operator().client(), - operator().admin(), - readerConfig(), - sourceConfig(), - flinkSchema(new SimpleStringSchema()), - null); - } + @SubType SubscriptionType subscriptionType = SubscriptionType.Shared; } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java new file mode 100644 index 0000000..87f3976 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import org.apache.pulsar.client.api.MessageId; +import org.junit.jupiter.api.extension.ParameterContext; + +import java.util.ArrayList; +import java.util.List; + +/** Put static methods that can be used by multiple test classes. */ +public class PulsarTestCommonUtils { + + // ------- CreateSplits + /** creates a fullRange() partitionSplit. */ + public static PulsarPartitionSplit createPartitionSplit(String topic, int partitionId) { + return createPartitionSplit(topic, partitionId, Boundedness.CONTINUOUS_UNBOUNDED); + } + + public static PulsarPartitionSplit createPartitionSplit( + String topic, int partitionId, Boundedness boundedness) { + return createPartitionSplit(topic, partitionId, boundedness, MessageId.earliest); + } + + public static PulsarPartitionSplit createPartitionSplit( + String topic, int partitionId, Boundedness boundedness, MessageId latestConsumedId) { + TopicPartition topicPartition = + new TopicPartition(topic, partitionId, TopicRange.createFullRange()); + + StopCursor stopCursor = + boundedness == Boundedness.BOUNDED ? StopCursor.latest() : StopCursor.never(); + return new PulsarPartitionSplit(topicPartition, stopCursor, latestConsumedId, null); + } + + public static List<PulsarPartitionSplit> createPartitionSplits( + String topicName, int numSplits, Boundedness boundedness) { + List<PulsarPartitionSplit> splits = new ArrayList<>(); + for (int i = 0; i < numSplits; i++) { + splits.add(createPartitionSplit(topicName, i, boundedness)); + } + return splits; + } + + // -------- InvocationContext Utils + + public static boolean isAssignableFromParameterContext( + Class<?> requiredType, ParameterContext context) { + return requiredType.isAssignableFrom(context.getParameter().getType()); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java similarity index 52% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java copy to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java index a94504a..b1a8362 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java @@ -16,22 +16,17 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.source.reader.split; +package org.apache.flink.connector.pulsar.testutils.extension; -import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.annotation.Experimental; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -/** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */ -class PulsarOrderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase { - - @Override - protected PulsarPartitionSplitReaderBase<String> splitReader() { - return new PulsarOrderedPartitionSplitReader<>( - operator().client(), - operator().admin(), - readerConfig(), - sourceConfig(), - flinkSchema(new SimpleStringSchema())); - } -} +/** Marks the field in test class defining {@link org.apache.pulsar.client.api.SubscriptionType}. */ +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@Experimental +public @interface SubType {} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java new file mode 100644 index 0000000..07c9287 --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.testutils.extension; + +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.platform.commons.support.AnnotationSupport; + +import java.lang.annotation.Annotation; +import java.util.Collection; +import java.util.List; + +/** An extension for subclasses to specify {@link org.apache.pulsar.client.api.SubscriptionType}. */ +public class TestOrderlinessExtension implements BeforeAllCallback { + + public static final ExtensionContext.Namespace PULSAR_TEST_RESOURCE_NAMESPACE = + ExtensionContext.Namespace.create("pulsarTestResourceNamespace"); + public static final String PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY = + "pulsarSourceReaderSubscriptionTypeStoreKey"; + + private SubscriptionType subscriptionType; + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + final List<SubscriptionType> subscriptionTypes = + AnnotationSupport.findAnnotatedFieldValues( + context.getRequiredTestInstance(), SubType.class, SubscriptionType.class); + checkExactlyOneAnnotatedField(subscriptionTypes, SubType.class); + subscriptionType = subscriptionTypes.get(0); + context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE) + .put(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY, subscriptionType); + } + + private void checkExactlyOneAnnotatedField( + Collection<?> fields, Class<? extends Annotation> annotation) { + if (fields.size() > 1) { + throw new IllegalStateException( + String.format( + "Multiple fields are annotated with '@%s'", + annotation.getSimpleName())); + } + if (fields.isEmpty()) { + throw new IllegalStateException( + String.format( + "No fields are annotated with '@%s'", annotation.getSimpleName())); + } + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 2d26925..188b77c 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -91,13 +91,18 @@ public class PulsarRuntimeOperator implements Serializable, Closeable { } public <T> void setupTopic(String topic, Schema<T> schema, Supplier<T> supplier) { + setupTopic(topic, schema, supplier, NUM_RECORDS_PER_PARTITION); + } + + public <T> void setupTopic( + String topic, Schema<T> schema, Supplier<T> supplier, int numRecordsPerSplit) { createTopic(topic, DEFAULT_PARTITIONS); - // Make sure every topic partition has message. + // Make sure every topic partition has messages. for (int i = 0; i < DEFAULT_PARTITIONS; i++) { String partitionName = TopicNameUtils.topicNameWithPartition(topic, i); List<T> messages = - Stream.generate(supplier).limit(NUM_RECORDS_PER_PARTITION).collect(toList()); + Stream.generate(supplier).limit(numRecordsPerSplit).collect(toList()); sendMessages(partitionName, schema, messages); } @@ -204,7 +209,6 @@ public class PulsarRuntimeOperator implements Serializable, Closeable { Configuration configuration = new Configuration(); configuration.set(PULSAR_SERVICE_URL, serviceUrl()); configuration.set(PULSAR_ADMIN_URL, adminUrl()); - return configuration; }
