imaffe commented on a change in pull request #17937:
URL: https://github.com/apache/flink/pull/17937#discussion_r763972317



##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
##########
@@ -18,95 +18,165 @@
 
 package org.apache.flink.connector.pulsar.source.reader.split;
 
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Stream;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyThrow;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.never;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.pulsar.client.api.Schema.STRING;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 /** Test utils for split readers. */
 public abstract class PulsarPartitionSplitReaderTestBase extends 
PulsarTestSuiteBase {
 
-    @RegisterExtension
-    PulsarSplitReaderInvocationContextProvider provider =
-            new PulsarSplitReaderInvocationContextProvider();
-
+    /** Default reader config: max message 1, fetch timeout 1s. */
     protected Configuration readerConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
         config.set(PULSAR_MAX_FETCH_TIME, 1000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-
         return config;
     }
 
     protected SourceConfiguration sourceConfig() {
         return new SourceConfiguration(readerConfig());
     }
 
-    protected SplitsAddition<PulsarPartitionSplit> createSplit(String 
topicName, int partitionId) {
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        handleSplit(reader, topicName, partitionId, null);
+    }
+
+    protected void handleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
         TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
-        PulsarPartitionSplit split = new PulsarPartitionSplit(partition, 
never());
-        return new SplitsAddition<>(singletonList(split));
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), 
startPosition, null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+        reader.handleSplitsChanges(addition);
+    }
+
+    protected void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader, String topicName, 
int partitionId) {
+        seekStartPositionAndHandleSplit(reader, topicName, partitionId, 
MessageId.latest);
+    }
+
+    protected void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReaderBase<String> reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
+        TopicPartition partition = new TopicPartition(topicName, partitionId, 
createFullRange());
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), null, 
null);
+        SplitsAddition<PulsarPartitionSplit> addition = new 
SplitsAddition<>(singletonList(split));
+
+        // create consumer and seek before split changes
+        try (Consumer<byte[]> consumer = 
reader.createPulsarConsumer(partition)) {
+            // inclusive messageId
+            StartCursor startCursor = StartCursor.fromMessageId(startPosition);
+            startCursor.seekPosition(partition.getTopic(), 
partition.getPartitionId(), consumer);
+        } catch (PulsarClientException e) {
+            sneakyThrow(e);
+        }
+
+        reader.handleSplitsChanges(addition);
     }
 
     protected <T> PulsarMessage<T> 
fetchedMessage(PulsarPartitionSplitReaderBase<T> splitReader) {
-        try {
-            RecordsWithSplitIds<PulsarMessage<T>> records = 
splitReader.fetch();
-            if (records.nextSplit() != null) {
-                return records.nextRecordFromSplit();
+        return fetchedMessages(splitReader, 1, 
false).stream().findFirst().orElse(null);
+    }
+
+    protected <T> List<PulsarMessage<T>> fetchedMessages(
+            PulsarPartitionSplitReaderBase<T> splitReader, int expectedCount, 
boolean verify) {
+        return fetchedMessages(
+                splitReader, expectedCount, verify, 
Boundedness.CONTINUOUS_UNBOUNDED);
+    }
+
+    protected <T> List<PulsarMessage<T>> fetchedMessages(

Review comment:
       These methods are created to avoid code duplication and only used in 
this test class. The logic here is tightly coupled with the testing logic. 
Moving these methods to a common test util class might lose its context when 
reading it first time. In this case do we still need to move to util class or 
use test extension (e.g. call fetchedMessages () in BeforeCallback).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to