This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7397b96 [client] Provide a clock for generating publish timestamp for
producers (#4562)
7397b96 is described below
commit 7397b960d992321fc52f85739d180c7080c632b1
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jun 25 09:35:54 2019 +0800
[client] Provide a clock for generating publish timestamp for producers
(#4562)
*Motivation*
Currently producers uses `System.currentTimeMillis()` as publish timestamp
by default.
However at some use cases, producers would like to a different way for
generating publish timestamp.
E.g. in a database use case, a producer might be use HLC (Hybrid Logic
Clock) as publish timestamp;
in integration tests, it might require the producer to use a deterministic
way to generate publish timestamp.
*Changes*
This PR introduces a `clock` in building the client. This allows
applications to override the system clock
with its own implementation.
*Verify the change*
Add unit test to test customized clock in both batch and non-batch cases.
---
.../client/api/SimpleProducerConsumerTest.java | 140 +++++++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 17 +++
.../pulsar/client/impl/ClientBuilderImpl.java | 7 ++
.../apache/pulsar/client/impl/ProducerImpl.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 17 ++-
.../client/impl/conf/ClientConfigurationData.java | 5 +-
6 files changed, 183 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e7209a1..130a35b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
@@ -41,6 +42,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -57,9 +61,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -106,6 +112,140 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test
+ public void testPublishTimestampBatchDisabled() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+
+ AtomicLong ticker = new AtomicLong(0);
+
+ Clock clock = new Clock() {
+ @Override
+ public ZoneId getZone() {
+ return ZoneId.systemDefault();
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ return this;
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(millis());
+ }
+
+ @Override
+ public long millis() {
+ return ticker.incrementAndGet();
+ }
+ };
+
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .clock(clock)
+ .build();
+
+ final String topic =
"persistent://my-property/my-ns/test-publish-timestamp";
+
+ @Cleanup
+ Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ final int numMessages = 5;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage()
+ .value(("value-" + i).getBytes(UTF_8))
+ .eventTime((i + 1) * 100L)
+ .sendAsync();
+ }
+ producer.flush();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ log.info("Received message '{}'.", new String(msg.getValue(),
UTF_8));
+ assertEquals(1L + i, msg.getPublishTime());
+ assertEquals(100L * (i + 1), msg.getEventTime());
+ }
+ }
+
+ @Test
+ public void testPublishTimestampBatchEnabled() throws Exception {
+
+ log.info("-- Starting {} test --", methodName);
+
+ AtomicLong ticker = new AtomicLong(0);
+
+ Clock clock = new Clock() {
+ @Override
+ public ZoneId getZone() {
+ return ZoneId.systemDefault();
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ return this;
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(millis());
+ }
+
+ @Override
+ public long millis() {
+ return ticker.incrementAndGet();
+ }
+ };
+
+ @Cleanup
+ PulsarClient newPulsarClient = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .clock(clock)
+ .build();
+
+ final String topic =
"persistent://my-property/my-ns/test-publish-timestamp";
+
+ @Cleanup
+ Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ final int numMessages = 5;
+
+ @Cleanup
+ Producer<byte[]> producer = newPulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(10 * numMessages)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage()
+ .value(("value-" + i).getBytes(UTF_8))
+ .eventTime((i + 1) * 100L)
+ .sendAsync();
+ }
+ producer.flush();
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive();
+ log.info("Received message '{}'.", new String(msg.getValue(),
UTF_8));
+ assertEquals(1L, msg.getPublishTime());
+ assertEquals(100L * (i + 1), msg.getEventTime());
+ }
+ }
+
@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index eb02c3f..9e6cc7c 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -376,4 +377,20 @@ public interface ClientBuilder extends Cloneable {
* @return the client builder instance
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
+
+ /**
+ * The clock used by the pulsar client.
+ *
+ * <p>The clock is currently used by producer for setting publish
timestamps.
+ * {@link Clock#millis()} is called to retrieve current timestamp as the
publish
+ * timestamp when producers produce messages. The default clock is a
system default zone
+ * clock. So the publish timestamp is same as calling {@link
System#currentTimeMillis()}.
+ *
+ * <p>Warning: the clock is used for TTL enforcement and timestamp based
seeks.
+ * so be aware of the impacts if you are going to use a different clock.
+ *
+ * @param clock the clock used by the pulsar client to retrieve time
information
+ * @return the client builder instance
+ */
+ ClientBuilder clock(Clock clock);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 013e94d..c65b7c2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -223,4 +224,10 @@ public class ClientBuilderImpl implements ClientBuilder {
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
+
+ @Override
+ public ClientBuilder clock(Clock clock) {
+ conf.setClock(clock);
+ return this;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 48656f8..7d446a0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -352,7 +352,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
sequenceId = msgMetadataBuilder.getSequenceId();
}
if (!msgMetadataBuilder.hasPublishTime()) {
-
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
+
msgMetadataBuilder.setPublishTime(client.getClientClock().millis());
checkArgument(!msgMetadataBuilder.hasProducerName());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 764d25c..d345da5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
@@ -33,7 +32,12 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.*;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -69,7 +73,6 @@ import
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
-import org.apache.pulsar.common.api.proto.PulsarApi;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -115,6 +118,8 @@ public class PulsarClientImpl implements PulsarClient {
}
});
+ private final Clock clientClock;
+
public PulsarClientImpl(ClientConfigurationData conf) throws
PulsarClientException {
this(conf, getEventLoopGroup(conf));
}
@@ -131,6 +136,7 @@ public class PulsarClientImpl implements PulsarClient {
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
+ this.clientClock = conf.getClock();
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new
ExecutorProvider(conf.getNumListenerThreads(),
getThreadFactory("pulsar-external-listener"));
@@ -157,6 +163,11 @@ public class PulsarClientImpl implements PulsarClient {
return conf;
}
+ @VisibleForTesting
+ public Clock getClientClock() {
+ return clientClock;
+ }
+
@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.BYTES);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 3ab8638..e944896 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.time.Clock;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
@@ -70,6 +70,9 @@ public class ClientConfigurationData implements Serializable,
Cloneable {
private long defaultBackoffIntervalNanos =
TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
+ @JsonIgnore
+ private Clock clock = Clock.systemDefaultZone();
+
public Authentication getAuthentication() {
if (authentication == null) {
this.authentication = new AuthenticationDisabled();