This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f13af48 Allow to configure ack-timeout tick time (#4760)
f13af48 is described below
commit f13af487699dcb36b7458ff4872a771837feefc6
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jul 21 01:51:05 2019 -0700
Allow to configure ack-timeout tick time (#4760)
### Motivation
After the changes in #3118, there has a been a sharp increase of memory
utilization for the UnackedMessageTracker due to the time buckets being created.
This is especially true when the acktimeout is set to a larger value (eg:
1h) where 3600 time-buckets are being created. This lead to use 20MB per
partition even when no message is tracked.
Allowing to configure the tick time so that application can tune it based
on needs.
Additionally, fixed the logic that keeps creating hash maps and throwing
them away at each tick time iteration, since that creates a lot of garbage and
doesn't take care of the fact that the hash maps are expanding based on the
required capacity (so next time they are already of the "right" size).
On a final note: the current default of 1sec seems very wasteful. Something
like 10s should be more appropriate as default.
---
.../apache/pulsar/client/api/ConsumerBuilder.java | 25 ++++++++++++++----
.../pulsar/client/impl/ConsumerBuilderImpl.java | 9 +++++++
.../pulsar/client/impl/UnAckedMessageTracker.java | 30 ++++++++++++++--------
.../pulsar/sql/presto/PulsarConnectorCache.java | 2 +-
.../pulsar/sql/presto/PulsarSplitManager.java | 2 +-
5 files changed, 51 insertions(+), 17 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index f8c5e0a..c4bf5a9 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -170,7 +170,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Set the timeout for unacked messages, truncated to the nearest
millisecond. The timeout needs to be greater than
- * 10 seconds.
+ * 1 second.
* <p>
* By default, the acknowledge timeout is disabled and that means that
messages delivered to a
* consumer will not be re-delivered unless the consumer crashes.
@@ -188,6 +188,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/**
+ * Define the granularity of the ack-timeout redelivery.
+ * <p>
+ * By default, the tick time is set to 1 second. Using an higher tick time
will
+ * reduce the memory overhead to track messages when the ack-timeout is
set to
+ * bigger values (eg: 1hour).
+ *
+ * @param tickTime
+ * the min precision for the ack timeout messages tracker
+ * @param timeUnit
+ * unit in which the timeout is provided.
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
+
+ /**
* Set the delay to wait before re-delivering messages that have failed to
be process.
* <p>
* When application uses {@link Consumer#negativeAcknowledge(Message)},
the failed message
@@ -386,7 +401,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1,
C4, C5, C4
* </pre>
- *
+ *
* <b>Failover subscription</b>
* Broker selects active consumer for a failover-subscription based on
consumer's priority-level and lexicographical sorting of a consumer name.
* eg:
@@ -395,15 +410,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Consumer PriorityLevel Name
* C1 0 aaa
* C2 0 bbb
- *
+ *
* 2. Active consumer = C2 : Consumer with highest priority
* Consumer PriorityLevel Name
* C1 1 aaa
* C2 0 bbb
- *
+ *
* Partitioned-topics:
* Broker evenly assigns partitioned topics to highest priority consumers.
- *
+ *
* </pre>
*
* @param priorityLevel the priority of this consumer
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 6913442..9eed128 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -58,6 +58,7 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
private List<ConsumerInterceptor<T>> interceptorList;
private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
+ private static long MIN_TICK_TIME_MILLIS = 100;
private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;
@@ -157,6 +158,14 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit
timeUnit) {
+ checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
+ "Ack timeout tick time should be greater than " +
MIN_TICK_TIME_MILLIS + " ms");
+ conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay,
TimeUnit timeUnit) {
checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >=
0");
conf.setNegativeAckRedeliveryDelayMicros(timeUnit.toMicros(redeliveryDelay));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 0ab17d9..aa576b5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.client.impl;
import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import io.netty.util.concurrent.FastThreadLocal;
+
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(UnAckedMessageTracker.class);
protected final ConcurrentHashMap<MessageId,
ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
- protected final LinkedList<ConcurrentOpenHashSet<MessageId>>
timePartitions;
+ protected final ArrayDeque<ConcurrentOpenHashSet<MessageId>>
timePartitions;
protected final Lock readLock;
protected final Lock writeLock;
@@ -94,6 +97,13 @@ public class UnAckedMessageTracker implements Closeable {
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}
+ private static final FastThreadLocal<HashSet<MessageId>>
TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>() {
+ @Override
+ protected HashSet<MessageId> initialValue() throws Exception {
+ return new HashSet<>();
+ }
+ };
+
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?>
consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis
>= tickDurationInMs);
this.ackTimeoutMillis = ackTimeoutMillis;
@@ -102,20 +112,21 @@ public class UnAckedMessageTracker implements Closeable {
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.messageIdPartitionMap = new ConcurrentHashMap<>();
- this.timePartitions = new LinkedList<>();
+ this.timePartitions = new ArrayDeque<>();
int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis /
this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
- timePartitions.add(new ConcurrentOpenHashSet<>());
+ timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
}
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
- Set<MessageId> messageIds = new HashSet<>();
+ Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
+ messageIds.clear();
+
writeLock.lock();
try {
- timePartitions.addLast(new ConcurrentOpenHashSet<>());
ConcurrentOpenHashSet<MessageId> headPartition =
timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out",
consumerBase, timePartitions.size());
@@ -124,6 +135,9 @@ public class UnAckedMessageTracker implements Closeable {
messageIdPartitionMap.remove(messageId);
});
}
+
+ headPartition.clear();
+ timePartitions.addLast(headPartition);
} finally {
writeLock.unlock();
}
@@ -140,11 +154,7 @@ public class UnAckedMessageTracker implements Closeable {
writeLock.lock();
try {
messageIdPartitionMap.clear();
- timePartitions.clear();
- int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis /
tickDurationInMs);
- for (int i = 0; i < blankPartitions + 1; i++) {
- timePartitions.add(new ConcurrentOpenHashSet<>());
- }
+ timePartitions.forEach(tp -> tp.clear());
} finally {
writeLock.unlock();
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index a9cd070..36ba1d2 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -86,7 +86,7 @@ public class PulsarConnectorCache {
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(true)
- .setStickyReadsEnabled(true)
+ .setStickyReadsEnabled(false)
.setReadEntryTimeout(60);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 40f31c9..445564a 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -128,7 +128,7 @@ public class PulsarSplitManager implements
ConnectorSplitManager {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
- .setStickyReadsEnabled(true)
+ .setStickyReadsEnabled(false)
.setUseV2WireProtocol(true);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}