fapaul commented on a change in pull request #17937:
URL: https://github.com/apache/flink/pull/17937#discussion_r763142827
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
##########
@@ -112,64 +119,270 @@ void startWithPeriodicPartitionDiscovery() throws
Exception {
}
}
- private PulsarSourceEnumerator createEnumerator(
+ @ParameterizedTest
+ @EnumSource(
+ value = SubscriptionType.class,
+ names = {"Failover", "Shared"})
+ void discoverPartitionsTriggersAssignments(SubscriptionType
subscriptionType) throws Throwable {
+ Set<String> prexistingTopics = setupPreexistingTopics();
+ try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+ PulsarSourceEnumerator enumerator =
+ createEnumerator(
+ subscriptionType,
+ prexistingTopics,
+ context,
+ DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+ enumerator.start();
+
+ // register reader 0, 1
+ registerReader(context, enumerator, READER0);
+ registerReader(context, enumerator, READER1);
+ assertTrue(context.getSplitsAssignmentSequence().isEmpty());
+
+ // Run the partition discover callable and check the partition
assignment.
+ runOneTimePartitionDiscovery(context);
+ verifyLastReadersAssignments(subscriptionType, context,
prexistingTopics, 1);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = SubscriptionType.class,
+ names = {"Failover", "Shared"})
+ @Timeout(value = 60)
Review comment:
We usually discourage using timeouts because they can easily make the
test unstable due to different execution durations in different environments.
Please try to remove it.
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
+import static
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+abstract class PulsarSourceReaderTestBase extends PulsarTestSuiteBase {
+
+ protected static final int DEFAULT_POLLING_TIMEOUT_SECONDS = 120;
+
+ /** Expected the add or override config entries in inherited classes. */
+ protected abstract void customConfig(Configuration config);
+
+ @BeforeEach
+ void beforeEach(TestInfo testInfo) {
+ Random random = new Random(System.currentTimeMillis());
+ String topicName = uniqueTestName(testInfo);
+ operator().setupTopic(topicName, Schema.INT32, () ->
random.nextInt(20));
+ }
+
+ @AfterEach
+ void afterEach(TestInfo testInfo) {
+ String topicName = uniqueTestName(testInfo);
+ operator().deleteTopic(topicName, true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void assignZeroSplitsCreatesZeroSubscription(
+ boolean autoAcknowledgementEnabled, TestInfo testInfo) throws
Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ reader.snapshotState(100L);
+ reader.notifyCheckpointComplete(100L);
+ // Verify the committed offsets.
+ reader.close();
+ for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) {
+
verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName,
i));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void assigningEmptySplits(boolean autoAcknowledgementEnabled, TestInfo
testInfo)
+ throws Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ final PulsarPartitionSplit emptySplit =
+ createPartitionSplit(
+ topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED,
MessageId.latest);
+
+ reader.addSplits(Collections.singletonList(emptySplit));
+
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+ InputStatus status;
+ status = reader.pollNext(output);
+ // We sleep for 1 sec to avoid entering false negative case:
+ // we perform assertion when add splits task
+ // is dequeued but not executed.
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
Review comment:
Can you explain why we need this? This also seems like a source of
flakiness.
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -126,63 +196,116 @@ void
pollMessageAfterTimeout(PulsarPartitionSplitReaderBase<String> splitReader)
"Couldn't poll message from Pulsar.");
}
- /** Create a split reader with max message 1, fetch timeout 1s. */
- protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
+ @Test
+ void consumeMessageCreatedAfterHandleSplitChangesAndFetch() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ operator().sendMessage(topicNameWithPartition(topicName, 0), STRING,
randomAlphabetic(10));
+ fetchedMessages(splitReader, 1, true);
+ }
- /** JUnit5 extension for all the TestTemplate methods in this class. */
- public class PulsarSplitReaderInvocationContextProvider
- implements TestTemplateInvocationContextProvider {
+ @Test
+ void consumeMessageCreatedBeforeHandleSplitsChanges() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- @Override
- public boolean supportsTestTemplate(ExtensionContext context) {
- return true;
- }
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.earliest);
+ fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+ }
- @Override
- public Stream<TestTemplateInvocationContext>
provideTestTemplateInvocationContexts(
- ExtensionContext context) {
- return Stream.of(new
PulsarSplitReaderInvocationContext(splitReader()));
- }
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.latest);
+ fetchedMessages(splitReader, 0, true);
}
- /** Parameter resolver for Split Reader. */
- public static class PulsarSplitReaderInvocationContext
- implements TestTemplateInvocationContext {
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor()
{
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ MessageIdImpl lastMessageId =
+ (MessageIdImpl)
+ sneakyAdmin(
+ () ->
+ operator()
+ .admin()
+ .topics()
+ .getLastMessageId(
+
topicNameWithPartition(topicName, 0)));
+ // when doing seek directly on consumer, by default it includes the
specified messageId
+ seekStartPositionAndHandleSplit(
+ splitReader,
+ topicName,
+ 0,
+ new MessageIdImpl(
+ lastMessageId.getLedgerId(),
+ lastMessageId.getEntryId() - 1,
+ lastMessageId.getPartitionIndex()));
+ fetchedMessages(splitReader, 2, true);
+ }
- private final PulsarPartitionSplitReaderBase<?> splitReader;
+ @Test
+ void emptyTopic() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().createTopic(topicName, DEFAULT_PARTITIONS);
+ seekStartPositionAndHandleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- public
PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase<?>
splitReader) {
- this.splitReader = checkNotNull(splitReader);
- }
+ @Test
+ void emptyTopicWithoutSeek() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().createTopic(topicName, DEFAULT_PARTITIONS);
+ handleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
- @Override
- public String getDisplayName(int invocationIndex) {
- return splitReader.getClass().getSimpleName();
+ @Test
+ void wakeupSplitReaderShouldNotCauseException() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ handleSplit(splitReader, "non-exist", 0);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ Thread t =
+ new Thread(
+ () -> {
+ try {
+ splitReader.fetch();
+ } catch (Throwable e) {
+ error.set(e);
+ }
+ },
+ "testWakeUp-thread");
+ t.start();
+ long deadline = System.currentTimeMillis() + 5000L;
+ while (t.isAlive() && System.currentTimeMillis() < deadline) {
+ splitReader.wakeUp();
+ sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
}
+ assertNull(error.get());
+ }
- @Override
- public List<Extension> getAdditionalExtensions() {
- return Collections.singletonList(
- new ParameterResolver() {
- @Override
- public boolean supportsParameter(
- ParameterContext parameterContext,
- ExtensionContext extensionContext)
- throws ParameterResolutionException {
- return parameterContext
- .getParameter()
- .getType()
-
.equals(PulsarPartitionSplitReaderBase.class);
- }
-
- @Override
- public Object resolveParameter(
- ParameterContext parameterContext,
- ExtensionContext extensionContext)
- throws ParameterResolutionException {
- return splitReader;
- }
- });
- }
+ @Test
+ void assignNoSplits() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ assertNull(fetchedMessage(splitReader));
}
+
+ /** Create a split reader with max message 1, fetch timeout 1s. */
+ protected abstract PulsarPartitionSplitReaderBase<String> splitReader();
Review comment:
You can implement an extension to instantiate the test SplitReader in
the subclasses and directly passes it to the test without the need of an
abstract method. You can take a look at how the `SourceTestSuiteBase` does it
with the `TestEnvironment` [1]
[1]
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
+import static
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(TestLoggerExtension.class)
+class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
+
+ private static final int MAX_EMPTY_POLLING_TIMES = 10;
+
+ @Override
+ protected void customConfig(Configuration config) {
+ config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE,
SubscriptionType.Failover);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void pollMessageWithIntervalLongerThanTimeout(
+ boolean autoAcknowledgementEnabled, TestInfo testInfo) throws
Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ String topicName = uniqueTestName(testInfo);
Review comment:
Isn't this topic created before every test in
`PulsarSourceReaderTestBase#before`?
In general, I'd discourage sharing test setup code through inheritance
because it becomes unmaintainable very fast and it is hard to extend. I prefer
to use extensions that I can use in any test.
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
##########
@@ -34,4 +43,57 @@
sourceConfig(),
flinkSchema(new SimpleStringSchema()));
}
+
+ @Test
+ void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek() {
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ handleSplit(splitReader, topicName, 0);
+ fetchedMessages(splitReader, 0, true);
+ }
+
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek()
{
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ handleSplit(splitReader, topicName, 0, MessageId.latest);
+ fetchedMessages(splitReader, 0, true);
+ }
+
+ @Test
+ void
consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek()
{
+ PulsarPartitionSplitReaderBase<String> splitReader = splitReader();
+ String topicName = randomAlphabetic(10);
+ operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ handleSplit(splitReader, topicName, 0, MessageId.earliest);
+ fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+ }
Review comment:
Nit: Maybe think about using a parameterized test to avoid the code
duplication
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -18,95 +18,165 @@
package org.apache.flink.connector.pulsar.source.reader.split;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicReference;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.never;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static org.apache.pulsar.client.api.Schema.STRING;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;
/** Test utils for split readers. */
public abstract class PulsarPartitionSplitReaderTestBase extends
PulsarTestSuiteBase {
- @RegisterExtension
- PulsarSplitReaderInvocationContextProvider provider =
- new PulsarSplitReaderInvocationContextProvider();
-
+ /** Default reader config: max message 1, fetch timeout 1s. */
protected Configuration readerConfig() {
Configuration config = operator().config();
config.set(PULSAR_MAX_FETCH_RECORDS, 1);
config.set(PULSAR_MAX_FETCH_TIME, 1000L);
config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-
return config;
}
protected SourceConfiguration sourceConfig() {
return new SourceConfiguration(readerConfig());
}
- protected SplitsAddition<PulsarPartitionSplit> createSplit(String
topicName, int partitionId) {
+ protected void handleSplit(
+ PulsarPartitionSplitReaderBase<String> reader, String topicName,
int partitionId) {
+ handleSplit(reader, topicName, partitionId, null);
+ }
+
+ protected void handleSplit(
+ PulsarPartitionSplitReaderBase<String> reader,
+ String topicName,
+ int partitionId,
+ MessageId startPosition) {
TopicPartition partition = new TopicPartition(topicName, partitionId,
createFullRange());
- PulsarPartitionSplit split = new PulsarPartitionSplit(partition,
never());
- return new SplitsAddition<>(singletonList(split));
+ PulsarPartitionSplit split =
+ new PulsarPartitionSplit(partition, StopCursor.never(),
startPosition, null);
+ SplitsAddition<PulsarPartitionSplit> addition = new
SplitsAddition<>(singletonList(split));
+ reader.handleSplitsChanges(addition);
+ }
+
+ protected void seekStartPositionAndHandleSplit(
+ PulsarPartitionSplitReaderBase<String> reader, String topicName,
int partitionId) {
+ seekStartPositionAndHandleSplit(reader, topicName, partitionId,
MessageId.latest);
+ }
+
+ protected void seekStartPositionAndHandleSplit(
+ PulsarPartitionSplitReaderBase<String> reader,
+ String topicName,
+ int partitionId,
+ MessageId startPosition) {
+ TopicPartition partition = new TopicPartition(topicName, partitionId,
createFullRange());
+ PulsarPartitionSplit split =
+ new PulsarPartitionSplit(partition, StopCursor.never(), null,
null);
+ SplitsAddition<PulsarPartitionSplit> addition = new
SplitsAddition<>(singletonList(split));
+
+ // create consumer and seek before split changes
+ try (Consumer<byte[]> consumer =
reader.createPulsarConsumer(partition)) {
+ // inclusive messageId
+ StartCursor startCursor = StartCursor.fromMessageId(startPosition);
+ startCursor.seekPosition(partition.getTopic(),
partition.getPartitionId(), consumer);
+ } catch (PulsarClientException e) {
+ sneakyThrow(e);
+ }
+
+ reader.handleSplitsChanges(addition);
}
protected <T> PulsarMessage<T>
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
- try {
- RecordsWithSplitIds<PulsarMessage<T>> records =
splitReader.fetch();
- if (records.nextSplit() != null) {
- return records.nextRecordFromSplit();
+ return fetchedMessages(splitReader, 1,
false).stream().findFirst().orElse(null);
+ }
+
+ protected <T> List<PulsarMessage<T>> fetchedMessages(
+ PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount,
boolean verify) {
+ return fetchedMessages(
+ splitReader, expectedCount, verify,
Boundedness.CONTINUOUS_UNBOUNDED);
+ }
+
+ protected <T> List<PulsarMessage<T>> fetchedMessages(
Review comment:
Please move these methods to a util class or test extension
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
+import static
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+abstract class PulsarSourceReaderTestBase extends PulsarTestSuiteBase {
+
+ protected static final int DEFAULT_POLLING_TIMEOUT_SECONDS = 120;
+
+ /** Expected the add or override config entries in inherited classes. */
+ protected abstract void customConfig(Configuration config);
+
+ @BeforeEach
+ void beforeEach(TestInfo testInfo) {
+ Random random = new Random(System.currentTimeMillis());
+ String topicName = uniqueTestName(testInfo);
+ operator().setupTopic(topicName, Schema.INT32, () ->
random.nextInt(20));
+ }
+
+ @AfterEach
+ void afterEach(TestInfo testInfo) {
+ String topicName = uniqueTestName(testInfo);
+ operator().deleteTopic(topicName, true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void assignZeroSplitsCreatesZeroSubscription(
+ boolean autoAcknowledgementEnabled, TestInfo testInfo) throws
Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ reader.snapshotState(100L);
+ reader.notifyCheckpointComplete(100L);
+ // Verify the committed offsets.
+ reader.close();
+ for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) {
+
verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName,
i));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void assigningEmptySplits(boolean autoAcknowledgementEnabled, TestInfo
testInfo)
+ throws Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ final PulsarPartitionSplit emptySplit =
+ createPartitionSplit(
+ topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED,
MessageId.latest);
+
+ reader.addSplits(Collections.singletonList(emptySplit));
+
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+ InputStatus status;
+ status = reader.pollNext(output);
+ // We sleep for 1 sec to avoid entering false negative case:
+ // we perform assertion when add splits task
+ // is dequeued but not executed.
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ assertEquals(status, InputStatus.NOTHING_AVAILABLE);
+ reader.close();
+ }
+
+ // ---------------------
+ protected String uniqueTestName(TestInfo testInfo) {
Review comment:
Please move all util methods into either a dedicated helper class if
they can be static or introduce a new test extension for pulsar
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.source;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
+import static
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(TestLoggerExtension.class)
+class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
+
+ private static final int MAX_EMPTY_POLLING_TIMES = 10;
+
+ @Override
+ protected void customConfig(Configuration config) {
+ config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE,
SubscriptionType.Failover);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void pollMessageWithIntervalLongerThanTimeout(
+ boolean autoAcknowledgementEnabled, TestInfo testInfo) throws
Exception {
+ PulsarSourceReaderBase<Integer> reader =
sourceReader(autoAcknowledgementEnabled);
+ Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ String topicName = uniqueTestName(testInfo);
+ setupSourceReader(reader, topicName, 0, boundedness);
+ sleepUninterruptibly(15, TimeUnit.SECONDS);
+ operator()
+ .sendMessage(TopicNameUtils.topicNameWithPartition(topicName,
0), Schema.INT32, 5);
+
+ // execute
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+ int expectedRecords = NUM_RECORDS_PER_PARTITION + 1;
+ pollUtilReadExpectedNumberOfRecordsAndValidate(
+ reader,
+ output,
+ expectedRecords,
+ TopicNameUtils.topicNameWithPartition(topicName, 0));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void consumeMessagesAndCommitOffsets(boolean autoAcknowledgementEnabled,
TestInfo testInfo)
+ throws Exception {
+ // set up the partition
+ PulsarOrderedSourceReader<Integer> reader =
+ (PulsarOrderedSourceReader<Integer>)
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ setupSourceReader(reader, topicName, 0,
Boundedness.CONTINUOUS_UNBOUNDED);
+
+ // waiting for results
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+ pollUntil(
+ reader,
+ output,
+ () -> output.getEmittedRecords().size() ==
NUM_RECORDS_PER_PARTITION,
+ "The output didn't poll enough records before timeout.");
+ reader.snapshotState(100L);
+ reader.notifyCheckpointComplete(100L);
+ pollUntil(
+ reader,
+ output,
+ reader.cursorsToCommit::isEmpty,
+ "The offset commit did not finish before timeout.");
+
+ // verify consumption
+ reader.close();
+ verifyAllMessageAcknowledged(
+ NUM_RECORDS_PER_PARTITION,
TopicNameUtils.topicNameWithPartition(topicName, 0));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void offsetCommitOnCheckpointComplete(boolean autoAcknowledgementEnabled,
TestInfo testInfo)
+ throws Exception {
+ PulsarOrderedSourceReader<Integer> reader =
+ (PulsarOrderedSourceReader<Integer>)
sourceReader(autoAcknowledgementEnabled);
+ String topicName = uniqueTestName(testInfo);
+ // consume more than 1 partition
+ reader.addSplits(
+ createPartitionSplits(
+ topicName, DEFAULT_PARTITIONS,
Boundedness.CONTINUOUS_UNBOUNDED));
+ reader.notifyNoMoreSplits();
+ TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+ long checkpointId = 0;
+ int emptyResultTime = 0;
+ InputStatus status;
+ do {
+ checkpointId++;
+ status = reader.pollNext(output);
+ // Create a checkpoint for each message consumption, but not
complete them.
+ reader.snapshotState(checkpointId);
+ // the first couple of pollNext() might return NOTHING_AVAILABLE
before data appears
+ if (InputStatus.NOTHING_AVAILABLE == status) {
+ emptyResultTime++;
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ } while (emptyResultTime < MAX_EMPTY_POLLING_TIMES
+ && status != InputStatus.END_OF_INPUT
+ && output.getEmittedRecords().size()
+ < NUM_RECORDS_PER_PARTITION * DEFAULT_PARTITIONS);
+
+ // The completion of the last checkpoint should subsume all previous
checkpoints.
+ assertEquals(checkpointId, reader.cursorsToCommit.size());
+
+ long lastCheckpointId = checkpointId;
+ CommonTestUtils.waitUtil(
+ () -> {
+ try {
+ reader.notifyCheckpointComplete(lastCheckpointId);
Review comment:
Why do we need to call `notifyCheckpointComplete` multiple times? AFAICT
the first call should already subsume all checkpoints.
##########
File path:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
##########
@@ -67,4 +71,60 @@ void rangeGeneratorRequiresKeyShared() {
IllegalArgumentException.class,
() -> builder.setRangeGenerator(new UniformRangeGenerator()));
}
+
+ @Test
+ void missingRequiredField() {
+ PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+ assertThrows(IllegalArgumentException.class, builder::build);
Review comment:
Recently the community decided to use assertj [1] for new assertions if
you rebase your branch to master you can immediately start using it.
[1]
##########
File path:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
##########
@@ -53,9 +53,13 @@ public MessageIdStartCursor(MessageId messageId, boolean
inclusive) {
messageId instanceof MessageIdImpl,
"We only support normal message id and batch message id.");
MessageIdImpl id = (MessageIdImpl) messageId;
- this.messageId =
- new MessageIdImpl(
- id.getLedgerId(), id.getEntryId() + 1,
id.getPartitionIndex());
+ if (MessageId.earliest.equals(messageId) ||
MessageId.latest.equals(messageId)) {
+ this.messageId = messageId;
+ } else {
+ this.messageId =
+ new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId() + 1,
id.getPartitionIndex());
+ }
Review comment:
I suspect that this is the bug fix? Please use a separate commit with a
short explanation of what this change is fixing for the bug fix and the related
test to make the git history more comprehensive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]