This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4b378a2 [pulsar-flink] Add subscription initial position (#4129)
4b378a2 is described below
commit 4b378a289170403bbead97a68623a12b11576039
Author: lipenghui <[email protected]>
AuthorDate: Fri Apr 26 23:38:17 2019 +0800
[pulsar-flink] Add subscription initial position (#4129)
### Motivation
Allow user to specify the initial position for consumer source builder.
### Modifications
Add initial position for PulsarConsumerSource.
---
.../streaming/connectors/pulsar/PulsarConsumerSource.java | 5 +++++
.../streaming/connectors/pulsar/PulsarSourceBuilder.java | 14 ++++++++++++++
.../connectors/pulsar/PulsarSourceBuilderTest.java | 7 +++++++
3 files changed, 26 insertions(+)
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 5af82bc..046c7d3 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
@@ -71,6 +72,7 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
private final long acknowledgementBatchSize;
private long batchCount;
+ private final SubscriptionInitialPosition initialPosition;
private transient volatile boolean isRunning;
@@ -83,6 +85,7 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
this.deserializer = builder.deserializationSchema;
this.subscriptionName = builder.subscriptionName;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
+ this.initialPosition = builder.initialPosition;
}
@Override
@@ -203,12 +206,14 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
return client.newConsumer().topicsPattern(topicsPattern)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
+ .subscriptionInitialPosition(initialPosition)
.subscribe();
} else {
return client.newConsumer()
.topics(Lists.newArrayList(topicNames))
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
+ .subscriptionInitialPosition(initialPosition)
.subscribe();
}
}
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3b78495..4ca8361 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import java.util.Arrays;
import java.util.List;
@@ -51,6 +52,7 @@ public class PulsarSourceBuilder<T> {
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
+ SubscriptionInitialPosition initialPosition =
SubscriptionInitialPosition.Latest;
private PulsarSourceBuilder(DeserializationSchema<T>
deserializationSchema) {
this.deserializationSchema = deserializationSchema;
@@ -154,6 +156,18 @@ public class PulsarSourceBuilder<T> {
}
/**
+ * Sets the subscription initial position for the topic consumer. Default
is {@link SubscriptionInitialPosition#Latest}
+ *
+ * @param initialPosition the subscription initial position.
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T>
subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
+ Preconditions.checkNotNull(initialPosition,"subscription initial
position cannot be null");
+ this.initialPosition = initialPosition;
+ return this;
+ }
+
+ /**
* Sets the number of messages to receive before acknowledging. This
defaults to 100. This
* value is only used when checkpointing is disabled.
*
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 4f59b6e..67a2433 100644
---
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.pulsar;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -47,6 +48,7 @@ public class PulsarSourceBuilderTest {
.serviceUrl("testServiceUrl")
.topic("testTopic")
.subscriptionName("testSubscriptionName")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.build();
Assert.assertNotNull(sourceFunction);
}
@@ -112,6 +114,11 @@ public class PulsarSourceBuilderTest {
pulsarSourceBuilder.subscriptionName(" ");
}
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testSubscriptionInitialPosition() {
+ pulsarSourceBuilder.subscriptionInitialPosition(null);
+ }
+
private class TestDeserializationSchema<T> implements
DeserializationSchema<T> {
@Override