This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7397b96  [client] Provide a clock for generating publish timestamp for 
producers (#4562)
7397b96 is described below

commit 7397b960d992321fc52f85739d180c7080c632b1
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jun 25 09:35:54 2019 +0800

    [client] Provide a clock for generating publish timestamp for producers 
(#4562)
    
    *Motivation*
    
    Currently producers uses `System.currentTimeMillis()` as publish timestamp 
by default.
    However at some use cases, producers would like to a different way for 
generating publish timestamp.
    E.g. in a database use case, a producer might be use HLC (Hybrid Logic 
Clock) as publish timestamp;
    in integration tests, it might require the producer to use a deterministic 
way to generate publish timestamp.
    
    *Changes*
    
    This PR introduces a `clock` in building the client. This allows 
applications to override the system clock
    with its own implementation.
    
    *Verify the change*
    
    Add unit test to test customized clock in both batch and non-batch cases.
---
 .../client/api/SimpleProducerConsumerTest.java     | 140 +++++++++++++++++++++
 .../apache/pulsar/client/api/ClientBuilder.java    |  17 +++
 .../pulsar/client/impl/ClientBuilderImpl.java      |   7 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  17 ++-
 .../client/impl/conf/ClientConfigurationData.java  |   5 +-
 6 files changed, 183 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e7209a1..130a35b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -41,6 +42,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -57,9 +61,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -106,6 +112,140 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test
+    public void testPublishTimestampBatchDisabled() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        AtomicLong ticker = new AtomicLong(0);
+
+        Clock clock = new Clock() {
+            @Override
+            public ZoneId getZone() {
+                return ZoneId.systemDefault();
+            }
+
+            @Override
+            public Clock withZone(ZoneId zone) {
+                return this;
+            }
+
+            @Override
+            public Instant instant() {
+                return Instant.ofEpochMilli(millis());
+            }
+
+            @Override
+            public long millis() {
+                return ticker.incrementAndGet();
+            }
+        };
+
+        @Cleanup
+        PulsarClient newPulsarClient = PulsarClient.builder()
+            .serviceUrl(lookupUrl.toString())
+            .clock(clock)
+            .build();
+
+        final String topic = 
"persistent://my-property/my-ns/test-publish-timestamp";
+
+        @Cleanup
+        Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+            .topic(topic)
+            .subscriptionName("my-sub")
+            .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = newPulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        final int numMessages = 5;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage()
+                .value(("value-" + i).getBytes(UTF_8))
+                .eventTime((i + 1) * 100L)
+                .sendAsync();
+        }
+        producer.flush();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            log.info("Received message '{}'.", new String(msg.getValue(), 
UTF_8));
+            assertEquals(1L + i, msg.getPublishTime());
+            assertEquals(100L * (i + 1), msg.getEventTime());
+        }
+    }
+
+    @Test
+    public void testPublishTimestampBatchEnabled() throws Exception {
+
+        log.info("-- Starting {} test --", methodName);
+
+        AtomicLong ticker = new AtomicLong(0);
+
+        Clock clock = new Clock() {
+            @Override
+            public ZoneId getZone() {
+                return ZoneId.systemDefault();
+            }
+
+            @Override
+            public Clock withZone(ZoneId zone) {
+                return this;
+            }
+
+            @Override
+            public Instant instant() {
+                return Instant.ofEpochMilli(millis());
+            }
+
+            @Override
+            public long millis() {
+                return ticker.incrementAndGet();
+            }
+        };
+
+        @Cleanup
+        PulsarClient newPulsarClient = PulsarClient.builder()
+            .serviceUrl(lookupUrl.toString())
+            .clock(clock)
+            .build();
+
+        final String topic = 
"persistent://my-property/my-ns/test-publish-timestamp";
+
+        @Cleanup
+        Consumer<byte[]> consumer = newPulsarClient.newConsumer()
+            .topic(topic)
+            .subscriptionName("my-sub")
+            .subscribe();
+
+        final int numMessages = 5;
+
+        @Cleanup
+        Producer<byte[]> producer = newPulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(10 * numMessages)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage()
+                .value(("value-" + i).getBytes(UTF_8))
+                .eventTime((i + 1) * 100L)
+                .sendAsync();
+        }
+        producer.flush();
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive();
+            log.info("Received message '{}'.", new String(msg.getValue(), 
UTF_8));
+            assertEquals(1L, msg.getPublishTime());
+            assertEquals(100L * (i + 1), msg.getEventTime());
+        }
+    }
+
     @DataProvider(name = "batch")
     public Object[][] codecProvider() {
         return new Object[][] { { 0 }, { 1000 } };
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index eb02c3f..9e6cc7c 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.time.Clock;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -376,4 +377,20 @@ public interface ClientBuilder extends Cloneable {
      * @return the client builder instance
      */
     ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
+
+    /**
+     * The clock used by the pulsar client.
+     *
+     * <p>The clock is currently used by producer for setting publish 
timestamps.
+     * {@link Clock#millis()} is called to retrieve current timestamp as the 
publish
+     * timestamp when producers produce messages. The default clock is a 
system default zone
+     * clock. So the publish timestamp is same as calling {@link 
System#currentTimeMillis()}.
+     *
+     * <p>Warning: the clock is used for TTL enforcement and timestamp based 
seeks.
+     * so be aware of the impacts if you are going to use a different clock.
+     *
+     * @param clock the clock used by the pulsar client to retrieve time 
information
+     * @return the client builder instance
+     */
+    ClientBuilder clock(Clock clock);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 013e94d..c65b7c2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.time.Clock;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -223,4 +224,10 @@ public class ClientBuilderImpl implements ClientBuilder {
     public ClientConfigurationData getClientConfigurationData() {
         return conf;
     }
+
+    @Override
+    public ClientBuilder clock(Clock clock) {
+        conf.setClock(clock);
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 48656f8..7d446a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -352,7 +352,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     sequenceId = msgMetadataBuilder.getSequenceId();
                 }
                 if (!msgMetadataBuilder.hasPublishTime()) {
-                    
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
+                    
msgMetadataBuilder.setPublishTime(client.getClientClock().millis());
 
                     checkArgument(!msgMetadataBuilder.hasProducerName());
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 764d25c..d345da5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -33,7 +32,12 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.util.*;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -69,7 +73,6 @@ import 
org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import 
org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
 import org.apache.pulsar.client.util.ExecutorProvider;
-import org.apache.pulsar.common.api.proto.PulsarApi;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -115,6 +118,8 @@ public class PulsarClientImpl implements PulsarClient {
                 }
             });
 
+    private final Clock clientClock;
+
     public PulsarClientImpl(ClientConfigurationData conf) throws 
PulsarClientException {
         this(conf, getEventLoopGroup(conf));
     }
@@ -131,6 +136,7 @@ public class PulsarClientImpl implements PulsarClient {
         this.eventLoopGroup = eventLoopGroup;
         setAuth(conf);
         this.conf = conf;
+        this.clientClock = conf.getClock();
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
         externalExecutorProvider = new 
ExecutorProvider(conf.getNumListenerThreads(), 
getThreadFactory("pulsar-external-listener"));
@@ -157,6 +163,11 @@ public class PulsarClientImpl implements PulsarClient {
         return conf;
     }
 
+    @VisibleForTesting
+    public Clock getClientClock() {
+        return clientClock;
+    }
+
     @Override
     public ProducerBuilder<byte[]> newProducer() {
         return new ProducerBuilderImpl<>(this, Schema.BYTES);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 3ab8638..e944896 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.client.impl.conf;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.time.Clock;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
@@ -70,6 +70,9 @@ public class ClientConfigurationData implements Serializable, 
Cloneable {
     private long defaultBackoffIntervalNanos = 
TimeUnit.MILLISECONDS.toNanos(100);
     private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
 
+    @JsonIgnore
+    private Clock clock = Clock.systemDefaultZone();
+
     public Authentication getAuthentication() {
         if (authentication == null) {
             this.authentication = new AuthenticationDisabled();

Reply via email to