This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 6991f38  [FLINK-28870][Connector/Pulsar] Improve the Pulsar source 
performance when meeting small data rates. (#15)
6991f38 is described below

commit 6991f383228b59cd86b874490aca3f190c59d168
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Jan 10 16:39:58 2023 +0800

    [FLINK-28870][Connector/Pulsar] Improve the Pulsar source performance when 
meeting small data rates. (#15)
---
 .../generated/pulsar_source_configuration.html     |  6 ++++++
 .../pulsar/source/PulsarSourceOptions.java         | 19 +++++++++++++++++
 .../pulsar/source/config/SourceConfiguration.java  | 19 +++++++++++++++--
 .../source/reader/PulsarPartitionSplitReader.java  | 24 ++++++++--------------
 .../source/enumerator/cursor/StopCursorTest.java   |  4 +++-
 .../reader/PulsarPartitionSplitReaderTest.java     |  6 ++++--
 .../source/reader/PulsarSourceReaderTest.java      |  4 +++-
 7 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
index 7682394..94c2bd8 100644
--- a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -38,6 +38,12 @@
             <td>Boolean</td>
             <td>The metrics from Pulsar Consumer are only exposed if you 
enable this option.You should set the <code 
class="highlighter-rouge">pulsar.client.statsIntervalSeconds</code> to a 
positive value if you enable this option.</td>
         </tr>
+        <tr>
+            <td><h5>pulsar.source.fetchOneMessageTime</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Long</td>
+            <td>The time (in ms) for fetching one message from Pulsar. If time 
exceed and no message returned from Pulsar. We would consider there is no 
record at the current topic partition and stop fetching until next switch.<br 
/>It's not configured by default. We will use the remaining time in <code 
class="highlighter-rouge">pulsar.source.maxFetchTime</code> by default, which 
may cause a long wait in small message rates. Add this option in source builder 
avoiding waiting too long.</td>
+        </tr>
         <tr>
             <td><h5>pulsar.source.maxFetchRecords</h5></td>
             <td style="word-wrap: break-word;">100</td>
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index a74f80f..fa42ccf 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -114,6 +114,25 @@ public final class PulsarSourceOptions {
                                             " We would automatically commit 
the cursor using the given period (in ms).")
                                     .build());
 
+    public static final ConfigOption<Integer> PULSAR_FETCH_ONE_MESSAGE_TIME =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The time (in ms) for fetching one 
message from Pulsar. If time exceed and no message returned from Pulsar.")
+                                    .text(
+                                            " We would consider there is no 
record at the current topic partition and stop fetching until next switch.")
+                                    .linebreak()
+                                    .text(
+                                            "It's not configured by default. 
We will use the remaining time in %s by default,",
+                                            code("pulsar.source.maxFetchTime"))
+                                    .text(" which may cause a long wait in 
small message rates.")
+                                    .text(
+                                            " Add this option in source 
builder avoiding waiting too long.")
+                                    .build());
+
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
                     .longType()
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 96e886a..7fb5e10 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -56,6 +57,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final long partitionDiscoveryIntervalMs;
     private final boolean enableAutoAcknowledgeMessage;
     private final long autoCommitCursorInterval;
+    private final int fetchOneMessageTime;
     private final Duration maxFetchTime;
     private final int maxFetchRecords;
     private final CursorVerification verifyInitialOffsets;
@@ -72,6 +74,7 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.partitionDiscoveryIntervalMs = 
get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
         this.enableAutoAcknowledgeMessage = 
get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
         this.autoCommitCursorInterval = 
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
+        this.fetchOneMessageTime = 
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0);
         this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
         this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
         this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
@@ -125,6 +128,14 @@ public class SourceConfiguration extends 
PulsarConfiguration {
         return autoCommitCursorInterval;
     }
 
+    /**
+     * The fetch time for polling one message. We would stop polling messages 
and return the
+     * messages in {@link RecordsWithSplitIds} when meet this timeout and no 
message consumed.
+     */
+    public int getFetchOneMessageTime() {
+        return fetchOneMessageTime;
+    }
+
     /**
      * The fetch time for flink split reader polling message. We would stop 
polling message and
      * return the message in {@link RecordsWithSplitIds} when timeout or 
exceed the {@link
@@ -202,11 +213,13 @@ public class SourceConfiguration extends 
PulsarConfiguration {
             return false;
         }
         SourceConfiguration that = (SourceConfiguration) o;
-        return partitionDiscoveryIntervalMs == 
that.partitionDiscoveryIntervalMs
+        return messageQueueCapacity == that.messageQueueCapacity
+                && partitionDiscoveryIntervalMs == 
that.partitionDiscoveryIntervalMs
                 && enableAutoAcknowledgeMessage == 
that.enableAutoAcknowledgeMessage
                 && autoCommitCursorInterval == that.autoCommitCursorInterval
-                && maxFetchRecords == that.maxFetchRecords
+                && fetchOneMessageTime == that.fetchOneMessageTime
                 && Objects.equals(maxFetchTime, that.maxFetchTime)
+                && maxFetchRecords == that.maxFetchRecords
                 && verifyInitialOffsets == that.verifyInitialOffsets
                 && Objects.equals(subscriptionName, that.subscriptionName)
                 && subscriptionMode == that.subscriptionMode
@@ -219,9 +232,11 @@ public class SourceConfiguration extends 
PulsarConfiguration {
     public int hashCode() {
         return Objects.hash(
                 super.hashCode(),
+                messageQueueCapacity,
                 partitionDiscoveryIntervalMs,
                 enableAutoAcknowledgeMessage,
                 autoCommitCursorInterval,
+                fetchOneMessageTime,
                 maxFetchTime,
                 maxFetchRecords,
                 verifyInitialOffsets,
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 638e4c7..6ba3274 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -50,14 +50,12 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -121,6 +119,7 @@ public class PulsarPartitionSplitReader
     }
 
     @Override
+    @SuppressWarnings("java:S135")
     public RecordsWithSplitIds<Message<byte[]>> fetch() throws IOException {
         RecordsBySplits.Builder<Message<byte[]>> builder = new 
RecordsBySplits.Builder<>();
 
@@ -138,8 +137,12 @@ public class PulsarPartitionSplitReader
                 messageNum < sourceConfiguration.getMaxFetchRecords() && 
deadline.hasTimeLeft();
                 messageNum++) {
             try {
-                Duration timeout = deadline.timeLeftIfAny();
-                Message<byte[]> message = pollMessage(timeout);
+                int fetchTime = sourceConfiguration.getFetchOneMessageTime();
+                if (fetchTime <= 0) {
+                    fetchTime = (int) deadline.timeLeftIfAny().toMillis();
+                }
+
+                Message<byte[]> message = pulsarConsumer.receive(fetchTime, 
TimeUnit.MILLISECONDS);
                 if (message == null) {
                     break;
                 }
@@ -228,7 +231,7 @@ public class PulsarPartitionSplitReader
         }
 
         // Create pulsar consumer.
-        this.pulsarConsumer = createPulsarConsumer(registeredSplit);
+        this.pulsarConsumer = 
createPulsarConsumer(registeredSplit.getPartition());
 
         LOG.info("Register split {} consumer for current reader.", 
registeredSplit);
     }
@@ -261,10 +264,6 @@ public class PulsarPartitionSplitReader
         }
     }
 
-    protected Message<byte[]> pollMessage(Duration timeout) throws 
PulsarClientException {
-        return pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), 
TimeUnit.MILLISECONDS);
-    }
-
     public void notifyCheckpointComplete(TopicPartition partition, MessageId 
offsetsToCommit) {
         if (pulsarConsumer == null) {
             this.pulsarConsumer = createPulsarConsumer(partition);
@@ -275,13 +274,8 @@ public class PulsarPartitionSplitReader
 
     // --------------------------- Helper Methods -----------------------------
 
-    /** Create a specified {@link Consumer} by the given split information. */
-    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit 
split) {
-        return createPulsarConsumer(split.getPartition());
-    }
-
     /** Create a specified {@link Consumer} by the given topic partition. */
-    protected Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
+    private Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
         ConsumerBuilder<byte[]> consumerBuilder =
                 createConsumerBuilder(pulsarClient, schema, 
sourceConfiguration);
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index d82aad3..9e13b15 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -39,6 +39,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -97,7 +98,8 @@ class StopCursorTest extends PulsarTestSuiteBase {
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
         return new SourceConfiguration(config);
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index 6902a7e..9d5f65b 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -48,6 +48,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -277,11 +278,12 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
                 createSourceReaderMetricGroup());
     }
 
-    /** Default source config: max message 1, fetch timeout 1s. */
+    /** Default source config: max message 1, fetch timeout 2s. */
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index e805d14..8a69992 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -61,6 +61,7 @@ import java.util.function.Supplier;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -229,7 +230,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
         Configuration configuration = operator().config();
 
         configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
+        configuration.set(PULSAR_MAX_FETCH_TIME, 3000L);
         configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
 
         PulsarDeserializationSchema<Integer> deserializationSchema =

Reply via email to