This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f13af48  Allow to configure ack-timeout tick time (#4760)
f13af48 is described below

commit f13af487699dcb36b7458ff4872a771837feefc6
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jul 21 01:51:05 2019 -0700

    Allow to configure ack-timeout tick time (#4760)
    
    ### Motivation
    
    After the changes in #3118, there has a been a sharp increase of memory 
utilization for the UnackedMessageTracker due to the time buckets being created.
    
    This is especially true when the acktimeout is set to a larger value (eg: 
1h) where 3600 time-buckets are being created. This lead to use 20MB per 
partition even when no message is tracked.
    
    Allowing to configure the tick time so that application can tune it based 
on needs.
    
    Additionally, fixed the logic that keeps creating hash maps and throwing 
them away at each tick time iteration, since that creates a lot of garbage and 
doesn't take care of the fact that the hash maps are expanding based on the 
required capacity (so next time they are already of the "right" size).
    
    On a final note: the current default of 1sec seems very wasteful. Something 
like 10s should be more appropriate as default.
---
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 25 ++++++++++++++----
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  9 +++++++
 .../pulsar/client/impl/UnAckedMessageTracker.java  | 30 ++++++++++++++--------
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  2 +-
 .../pulsar/sql/presto/PulsarSplitManager.java      |  2 +-
 5 files changed, 51 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index f8c5e0a..c4bf5a9 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -170,7 +170,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
 
     /**
      * Set the timeout for unacked messages, truncated to the nearest 
millisecond. The timeout needs to be greater than
-     * 10 seconds.
+     * 1 second.
      * <p>
      * By default, the acknowledge timeout is disabled and that means that 
messages delivered to a
      * consumer will not be re-delivered unless the consumer crashes.
@@ -188,6 +188,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
     /**
+     * Define the granularity of the ack-timeout redelivery.
+     * <p>
+     * By default, the tick time is set to 1 second. Using an higher tick time 
will
+     * reduce the memory overhead to track messages when the ack-timeout is 
set to
+     * bigger values (eg: 1hour).
+     *
+     * @param tickTime
+     *            the min precision for the ack timeout messages tracker
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
+
+    /**
      * Set the delay to wait before re-delivering messages that have failed to 
be process.
      * <p>
      * When application uses {@link Consumer#negativeAcknowledge(Message)}, 
the failed message
@@ -386,7 +401,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * C5       1             1
      * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, 
C4, C5, C4
      * </pre>
-     * 
+     *
      * <b>Failover subscription</b>
      * Broker selects active consumer for a failover-subscription based on 
consumer's priority-level and lexicographical sorting of a consumer name.
      * eg:
@@ -395,15 +410,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * Consumer PriorityLevel Name
      * C1       0             aaa
      * C2       0             bbb
-     * 
+     *
      * 2. Active consumer = C2 : Consumer with highest priority
      * Consumer PriorityLevel Name
      * C1       1             aaa
      * C2       0             bbb
-     * 
+     *
      * Partitioned-topics:
      * Broker evenly assigns partitioned topics to highest priority consumers.
-     * 
+     *
      * </pre>
      *
      * @param priorityLevel the priority of this consumer
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 6913442..9eed128 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -58,6 +58,7 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     private List<ConsumerInterceptor<T>> interceptorList;
 
     private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
+    private static long MIN_TICK_TIME_MILLIS = 100;
     private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;
 
 
@@ -157,6 +158,14 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit 
timeUnit) {
+        checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
+                "Ack timeout tick time should be greater than " + 
MIN_TICK_TIME_MILLIS + " ms");
+        conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, 
TimeUnit timeUnit) {
         checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 
0");
         
conf.setNegativeAckRedeliveryDelayMicros(timeUnit.toMicros(redeliveryDelay));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 0ab17d9..aa576b5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.client.impl;
 import com.google.common.base.Preconditions;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import io.netty.util.concurrent.FastThreadLocal;
+
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.ArrayDeque;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(UnAckedMessageTracker.class);
 
     protected final ConcurrentHashMap<MessageId, 
ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
-    protected final LinkedList<ConcurrentOpenHashSet<MessageId>> 
timePartitions;
+    protected final ArrayDeque<ConcurrentOpenHashSet<MessageId>> 
timePartitions;
 
     protected final Lock readLock;
     protected final Lock writeLock;
@@ -94,6 +97,13 @@ public class UnAckedMessageTracker implements Closeable {
         this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
     }
 
+    private static final FastThreadLocal<HashSet<MessageId>> 
TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>() {
+        @Override
+        protected HashSet<MessageId> initialValue() throws Exception {
+            return new HashSet<>();
+        }
+    };
+
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> 
consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
         Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis 
>= tickDurationInMs);
         this.ackTimeoutMillis = ackTimeoutMillis;
@@ -102,20 +112,21 @@ public class UnAckedMessageTracker implements Closeable {
         this.readLock = readWriteLock.readLock();
         this.writeLock = readWriteLock.writeLock();
         this.messageIdPartitionMap = new ConcurrentHashMap<>();
-        this.timePartitions = new LinkedList<>();
+        this.timePartitions = new ArrayDeque<>();
 
         int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / 
this.tickDurationInMs);
         for (int i = 0; i < blankPartitions + 1; i++) {
-            timePartitions.add(new ConcurrentOpenHashSet<>());
+            timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
         }
 
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
-                Set<MessageId> messageIds = new HashSet<>();
+                Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
+                messageIds.clear();
+
                 writeLock.lock();
                 try {
-                    timePartitions.addLast(new ConcurrentOpenHashSet<>());
                     ConcurrentOpenHashSet<MessageId> headPartition = 
timePartitions.removeFirst();
                     if (!headPartition.isEmpty()) {
                         log.warn("[{}] {} messages have timed-out", 
consumerBase, timePartitions.size());
@@ -124,6 +135,9 @@ public class UnAckedMessageTracker implements Closeable {
                             messageIdPartitionMap.remove(messageId);
                         });
                     }
+
+                    headPartition.clear();
+                    timePartitions.addLast(headPartition);
                 } finally {
                     writeLock.unlock();
                 }
@@ -140,11 +154,7 @@ public class UnAckedMessageTracker implements Closeable {
         writeLock.lock();
         try {
             messageIdPartitionMap.clear();
-            timePartitions.clear();
-            int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / 
tickDurationInMs);
-            for (int i = 0; i < blankPartitions + 1; i++) {
-                timePartitions.add(new ConcurrentOpenHashSet<>());
-            }
+            timePartitions.forEach(tp -> tp.clear());
         } finally {
             writeLock.unlock();
         }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index a9cd070..36ba1d2 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -86,7 +86,7 @@ public class PulsarConnectorCache {
                 .setZkServers(pulsarConnectorConfig.getZookeeperUri())
                 .setClientTcpNoDelay(false)
                 .setUseV2WireProtocol(true)
-                .setStickyReadsEnabled(true)
+                .setStickyReadsEnabled(false)
                 .setReadEntryTimeout(60);
         return new ManagedLedgerFactoryImpl(bkClientConfiguration);
     }
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 40f31c9..445564a 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -128,7 +128,7 @@ public class PulsarSplitManager implements 
ConnectorSplitManager {
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
                 .setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
                 .setClientTcpNoDelay(false)
-                .setStickyReadsEnabled(true)
+                .setStickyReadsEnabled(false)
                 .setUseV2WireProtocol(true);
         return new ManagedLedgerFactoryImpl(bkClientConfiguration);
     }

Reply via email to