AHeise commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r686751352



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.split;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.message.CollectorSupplier;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static 
org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
+import static 
org.apache.flink.connector.pulsar.source.config.CursorVerification.WARN_ON_MISMATCH;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumer;
+
+/**
+ * The common partition split reader.
+ *
+ * @param <OUT> the type of the pulsar source message that would be serialized 
to downstream.
+ */
+abstract class PulsarPartitionSplitReaderBase<OUT>
+        implements SplitReader<PulsarMessage<OUT>, PulsarPartitionSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
+
+    protected final PulsarClient pulsarClient;
+    protected final PulsarAdmin pulsarAdmin;
+    protected final Configuration configuration;
+    protected final SourceConfiguration sourceConfiguration;
+    protected final 
ConfigurationDataCustomizer<ConsumerConfigurationData<byte[]>>
+            consumerConfigurationCustomizer;
+    protected final PulsarDeserializationSchema<OUT> deserializationSchema;
+    protected final AtomicBoolean wakeup;
+
+    protected Consumer<byte[]> pulsarConsumer;
+    protected PulsarPartitionSplit registeredSplit;
+
+    protected PulsarPartitionSplitReaderBase(
+            PulsarClient pulsarClient,
+            PulsarAdmin pulsarAdmin,
+            Configuration configuration,
+            SourceConfiguration sourceConfiguration,
+            ConfigurationDataCustomizer<ConsumerConfigurationData<byte[]>>
+                    consumerConfigurationCustomizer,
+            PulsarDeserializationSchema<OUT> deserializationSchema) {
+        this.pulsarClient = pulsarClient;
+        this.pulsarAdmin = pulsarAdmin;
+        this.configuration = configuration;
+        this.sourceConfiguration = sourceConfiguration;
+        this.consumerConfigurationCustomizer = consumerConfigurationCustomizer;
+        this.deserializationSchema = deserializationSchema;
+        this.wakeup = new AtomicBoolean(false);
+    }
+
+    @Override
+    public RecordsWithSplitIds<PulsarMessage<OUT>> fetch() {
+        RecordsBySplits.Builder<PulsarMessage<OUT>> builder = new 
RecordsBySplits.Builder<>();
+
+        // Return when no split registered to this reader.
+        if (pulsarConsumer == null || registeredSplit == null) {
+            return builder.build();
+        }
+
+        // Set wakeup to false for start consuming.
+        wakeup.compareAndSet(true, false);
+
+        StopCursor stopCursor = registeredSplit.getStopCursor();
+        String splitId = registeredSplit.splitId();
+        CollectorSupplier<OUT> supplier = new CollectorSupplier<>(splitId, 
builder);
+        Deadline deadline = 
Deadline.fromNow(sourceConfiguration.getMaxFetchTime());
+
+        // Consume message from pulsar until it was woke up by flink reader.
+        for (int messageNum = 0;
+                messageNum < sourceConfiguration.getMaxFetchRecords()
+                        && deadline.hasTimeLeft()
+                        && isNotWakeup();
+                messageNum++) {
+            try {
+                Duration timeout = deadline.timeLeftIfAny();
+                Message<byte[]> message = pollMessage(timeout);
+
+                if (stopCursor.shouldStop(message)) {
+                    builder.addFinishedSplit(splitId);
+                    break;
+                }
+
+                // Deserialize message.
+                deserializationSchema.deserialize(message, 
supplier.collector(message));
+
+                // Acknowledge message if need.
+                finishedPollMessage(message);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                break;
+            } catch (TimeoutException e) {
+                break;
+            } catch (Exception e) {
+                LOG.error("", e);
+                break;
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> 
splitsChanges) {
+        LOG.debug("Handle split changes {}", splitsChanges);
+
+        // Get all the partition assignments and stopping offsets.
+        if (!(splitsChanges instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "The SplitChange type of %s is not supported.",
+                            splitsChanges.getClass()));
+        }
+
+        if (registeredSplit != null) {
+            throw new IllegalStateException("This split reader have assigned 
split.");
+        }
+
+        List<PulsarPartitionSplit> newSplits = splitsChanges.splits();
+        Preconditions.checkArgument(
+                newSplits.size() == 1, "This pulsar split reader only support 
one split.");
+        PulsarPartitionSplit newSplit = newSplits.get(0);
+
+        // Create pulsar consumer.
+        Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
+
+        // Open start & stop cursor.
+        newSplit.open(configuration, sourceConfiguration);
+
+        // Start Consumer.
+        startConsumer(newSplit, consumer);
+
+        LOG.info("Register split {} consumer for current reader.", newSplit);
+        this.registeredSplit = newSplit;
+        this.pulsarConsumer = consumer;
+    }
+
+    @Override
+    public void wakeUp() {
+        wakeup.compareAndSet(false, true);
+    }
+
+    @Override
+    public void close() {
+        if (pulsarConsumer != null) {
+            sneakyClient(() -> pulsarConsumer.close());
+        }
+    }
+
+    protected abstract Message<byte[]> pollMessage(Duration timeout)
+            throws ExecutionException, InterruptedException, TimeoutException;
+
+    protected abstract void finishedPollMessage(Message<byte[]> message);
+
+    protected abstract void startConsumer(PulsarPartitionSplit split, 
Consumer<byte[]> consumer);
+
+    // --------------------------- Helper Methods -----------------------------
+
+    protected void initialStartPosition(PulsarPartitionSplit split, 
Consumer<byte[]> consumer) {
+        StartCursor startCursor = split.getStartCursor();
+        TopicPartition partition = split.getPartition();
+
+        // Seek start consuming position for the newly assigned split.
+        CursorPosition position =
+                Optional.ofNullable(split.getLatestConsumedId())
+                        .map(CursorPosition::new)
+                        .orElseGet(
+                                () -> {
+                                    // If the split don't contains the last 
consumed message id,
+                                    // just use the start cursor's position.
+                                    return startCursor.position(split);
+                                });
+
+        // Validate the start position and assign it to consumer.
+        if (position.getType() == CursorPosition.Type.MESSAGE_ID) {
+            MessageId initialMessageId = position.getMessageId();
+
+            if (!initialMessageId.equals(MessageId.earliest)
+                    && !initialMessageId.equals(MessageId.latest)) {
+                MessageId lastMessageId =
+                        sneakyAdmin(
+                                () ->
+                                        pulsarAdmin
+                                                .topics()
+                                                
.getLastMessageId(partition.getFullTopicName()));
+                if (initialMessageId.compareTo(lastMessageId) > 0) {
+                    CursorVerification verification = 
sourceConfiguration.getVerifyInitialOffsets();
+                    if (verification == FAIL_ON_MISMATCH) {

Review comment:
       Okay another idea. Maybe we could extract the body of the second `if` 
into an extra method `verifyCursor`? Triple nested `if` is usually hard to 
understand at first.
   I meant everything after ```MessageId lastMessageId = ...`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to