imaffe commented on a change in pull request #17937:
URL: https://github.com/apache/flink/pull/17937#discussion_r779552658



##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -39,78 +50,164 @@
 import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.never;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
+import static 
org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.PulsarSourceOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
+import static 
org.apache.flink.connector.pulsar.testutils.extension.PulsarSourceOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test utils for split readers. */
+@ExtendWith({
+    PulsarSourceOrderlinessExtension.class,
+    TestLoggerExtension.class,
+})
 public abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
     @RegisterExtension
     PulsarSplitReaderInvocationContextProvider provider =
             new PulsarSplitReaderInvocationContextProvider();
 
+    /** Default reader config: max message 1, fetch timeout 1s. */
     protected Configuration readerConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
         config.set(PULSAR_MAX_FETCH_TIME, 1000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-
         return config;
     }
 
     protected SourceConfiguration sourceConfig() {
         return new SourceConfiguration(readerConfig());
     }
 
-    protected SplitsAddition<PulsarPartitionSplit> createSplit(String 
topicName, int partitionId) {
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        handleSplit(reader, topicName, partitionId, null);
+    }
+
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
+        TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), 
startPosition, null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+        reader.handleSplitsChanges(addition);
+    }
+
+    protected void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        seekStartPositionAndHandleSplit(reader, topicName, partitionId, 
MessageId.latest);
+    }
+
+    protected void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
         TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
-        PulsarPartitionSplit split = new PulsarPartitionSplit(partition, 
never());
-        return new SplitsAddition<>(singletonList(split));
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), null, 
null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+
+        // create consumer and seek before split changes
+        try (Consumer<byte[]> consumer = 
reader.createPulsarConsumer(partition)) {
+            // inclusive messageId
+            StartCursor startCursor = StartCursor.fromMessageId(startPosition);
+            startCursor.seekPosition(partition.getTopic(), 
partition.getPartitionId(), consumer);
+        } catch (PulsarClientException e) {
+            sneakyThrow(e);
+        }
+
+        reader.handleSplitsChanges(addition);
     }
 
     protected <T> PulsarMessage<T> 
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
-        try {
-            RecordsWithSplitIds<PulsarMessage<T>> records = 
splitReader.fetch();
-            if (records.nextSplit() != null) {
-                return records.nextRecordFromSplit();
+        return fetchedMessages(splitReader, 1, 
false).stream().findFirst().orElse(null);
+    }
+
+    protected <T> List<PulsarMessage<T>> fetchedMessages(
+            PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount, 
boolean verify) {
+        return fetchedMessages(
+                splitReader, expectedCount, verify, 
Boundedness.CONTINUOUS_UNBOUNDED);
+    }
+
+    protected <T> List<PulsarMessage<T>> fetchedMessages(

Review comment:
       I think here these methodsare provided potentially for the subclasses. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to