This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d309b2c Refactored to reflect changes in BK for OrderedExecutor
(#1489)
d309b2c is described below
commit d309b2cb2f766151dc64861ba15e6dc71fbe8d8f
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 3 14:48:46 2018 -0700
Refactored to reflect changes in BK for OrderedExecutor (#1489)
* Refactored to reflect changes in BK for OrderedExecutor
* Reverted back part of last change
* Fixed test
---
.../bookkeeper/mledger/impl/EntryCacheImpl.java | 4 +-
.../bookkeeper/mledger/impl/EntryCacheManager.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++-
.../mledger/impl/ManagedLedgerFactoryImpl.java | 21 +++++-----
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 45 ++++++++++----------
.../mledger/impl/MetaStoreImplZookeeper.java | 36 +++++++++-------
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 4 +-
.../bookkeeper/mledger/impl/OpReadEntry.java | 6 +--
.../mledger/impl/EntryCacheManagerTest.java | 6 +--
.../mledger/impl/ManagedLedgerMBeanTest.java | 8 ++--
.../org/apache/pulsar/broker/PulsarService.java | 9 ++--
.../pulsar/broker/namespace/NamespaceService.java | 4 +-
.../pulsar/broker/service/BrokerService.java | 20 ++++-----
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 9 ++--
.../service/persistent/MessageDeduplication.java | 2 +-
.../PersistentDispatcherMultipleConsumers.java | 2 +-
.../broker/auth/SameThreadOrderedSafeExecutor.java | 14 +++----
.../broker/cache/ResourceQuotaCacheTest.java | 10 +----
.../broker/namespace/OwnershipCacheTest.java | 11 ++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../discovery/service/BrokerDiscoveryProvider.java | 1 -
.../service/web/ZookeeperCacheLoader.java | 10 +----
.../proxy/server/util/ZookeeperCacheLoader.java | 10 +----
.../pulsar/zookeeper/GlobalZooKeeperCache.java | 6 +--
.../pulsar/zookeeper/LocalZooKeeperCache.java | 9 ++--
.../apache/pulsar/zookeeper/ZooKeeperCache.java | 49 ++++++++++------------
.../zookeeper/ZookeeperBkClientFactoryImpl.java | 12 +++---
.../pulsar/zookeeper/ZookeeperCacheTest.java | 12 +++---
29 files changed, 152 insertions(+), 180 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index f267a6c..60ba634 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -188,7 +188,7 @@ public class EntryCacheImpl implements EntryCache {
manager.mlFactoryMBean.recordCacheMiss(1,
returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
- ml.getExecutor().submitOrdered(ml.getName(), safeRun(() ->
{
+ ml.getExecutor().executeOrdered(ml.getName(), safeRun(()
-> {
callback.readEntryComplete(returnEntry, obj);
}));
} else {
@@ -254,7 +254,7 @@ public class EntryCacheImpl implements EntryCache {
checkNotNull(ml.getName());
checkNotNull(ml.getExecutor());
- ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
+ ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
// We got the entries, we need to transform them to a
List<> type
long totalSize = 0;
final List<EntryImpl> entriesToReturn =
Lists.newArrayListWithExpectedSize(entriesToRead);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 7faa18c..262cbeb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -109,7 +109,7 @@ public class EntryCacheManager {
// Trigger a single eviction in background. While the eviction is
running we stop inserting entries in the cache
if (currentSize > evictionTriggerThreshold &&
evictionInProgress.compareAndSet(false, true)) {
- mlFactory.executor.execute(safeRun(() -> {
+ mlFactory.scheduledExecutor.execute(safeRun(() -> {
// Trigger a new cache eviction cycle to bring the used memory
below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize *
cacheEvictionWatermak);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 29fe0a7..a2be71d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
+
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
@@ -51,6 +52,7 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.BKException;
@@ -847,7 +849,7 @@ public class ManagedCursorImpl implements ManagedCursor {
final PositionImpl newPosition = (PositionImpl) newPos;
// order trim and reset operations on a ledger
- ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> {
+ ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
if (ledger.isValidPosition(newPosition) ||
newPosition.equals(PositionImpl.earliest)
|| newPosition.equals(PositionImpl.latest)) {
internalResetCursor(newPosition, callback);
@@ -1923,7 +1925,7 @@ public class ManagedCursorImpl implements ManagedCursor {
bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(),
config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(),
config.getPassword(), (rc, lh, ctx) -> {
- ledger.getExecutor().submit(safeRun(() -> {
+ ledger.getExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor
{}: {}", ledger.getName(), name,
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 4c0d3d7..d148d1d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -23,7 +23,7 @@ import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
-import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -31,12 +31,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -63,7 +63,6 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -77,9 +76,9 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
- protected final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(16,
- new DefaultThreadFactory("bookkeeper-ml"));
- private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+ protected final OrderedScheduler scheduledExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+ .name("bookkeeper-ml-scheduler").build();
+ private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(16)
.name("bookkeeper-ml-workers").build();
protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -122,7 +121,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0,
StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper) throws Exception {
@@ -138,7 +137,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0,
StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}
private synchronized void refreshStats() {
@@ -232,7 +231,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new
CompletableFuture<>();
- final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
bookKeeper, store, config, executor,
+ final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
bookKeeper, store, config, scheduledExecutor,
orderedExecutor, name);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
@@ -305,7 +304,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}
}
- executor.shutdown();
+ scheduledExecutor.shutdown();
orderedExecutor.shutdown();
entryCacheManager.clear();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 34283ed..4aceb6c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,18 +22,26 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -45,6 +53,7 @@ import
org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -77,23 +86,13 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
@@ -190,8 +189,8 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class,
State.class, "state");
private volatile State state = null;
- private final ScheduledExecutorService scheduledExecutor;
- private final OrderedScheduler executor;
+ private final OrderedScheduler scheduledExecutor;
+ private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
@@ -204,7 +203,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// //////////////////////////////////////////////////////////////////////
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper
bookKeeper, MetaStore store,
- ManagedLedgerConfig config, ScheduledExecutorService
scheduledExecutor, OrderedScheduler orderedExecutor,
+ ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
OrderedExecutor orderedExecutor,
final String name) {
this.factory = factory;
this.bookKeeper = bookKeeper;
@@ -250,7 +249,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: ", name, id,
BKException.getMessage(rc));
@@ -338,7 +337,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(),
config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) ->
{
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
@@ -1319,7 +1318,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(),
config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
- executor.submit(safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give
chance to reopen it later
@@ -1484,12 +1483,12 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
break;
}
- executor.submit(safeRun(() ->
waitingCursor.notifyEntriesAvailable()));
+ executor.execute(safeRun(() ->
waitingCursor.notifyEntriesAvailable()));
}
}
private void trimConsumedLedgersInBackground() {
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
internalTrimConsumedLedgers();
}));
}
@@ -2113,11 +2112,11 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
return ledgers;
}
- ScheduledExecutorService getScheduledExecutor() {
+ OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
}
- OrderedScheduler getExecutor() {
+ OrderedExecutor getExecutor() {
return executor;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 7a76975..31705c1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -20,11 +20,16 @@ package org.apache.bookkeeper.mledger.impl;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.base.Charsets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -40,11 +45,6 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
@SuppressWarnings("checkstyle:javadoctype")
public class MetaStoreImplZookeeper implements MetaStore {
@@ -55,7 +55,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
private static final String prefix = prefixName + "/";
private final ZooKeeper zk;
- private final OrderedScheduler executor;
+ private final OrderedExecutor executor;
private static class ZKStat implements Stat {
private final int version;
@@ -90,7 +90,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
}
}
- public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor)
+ public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor)
throws Exception {
this.zk = zk;
this.executor = executor;
@@ -130,7 +130,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
@Override
public void getManagedLedgerInfo(final String ledgerName, final
MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
- zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat)
-> executor.submit(safeRun(() -> {
+ zk.getData(prefix + ledgerName, false,
+ (rc, path, ctx, readData, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
@@ -171,7 +172,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
- (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
+ (rc, path, zkCtx, stat1) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult
rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion()
: "null");
@@ -195,7 +196,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursors list", ledgerName);
}
- zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children,
stat) -> executor.submit(safeRun(() -> {
+ zk.getChildren(prefix + ledgerName, false,
+ (rc, path, ctx, children, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] getConsumers complete rc={} children={}",
ledgerName, Code.get(rc), children);
}
@@ -219,7 +221,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
log.debug("Reading from {}", path);
}
- zk.getData(path, false, (rc, path1, ctx, data, stat) ->
executor.submit(safeRun(() -> {
+ zk.getData(path, false, (rc, path1, ctx, data, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc != Code.OK.intValue()) {
callback.operationFailed(new
MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
@@ -251,7 +253,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
log.debug("[{}] Creating consumer {} on meta-data store with
{}", ledgerName, cursorName, info);
}
zk.create(path, content, Acl, CreateMode.PERSISTENT,
- (rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
+ (rc, path1, ctx, name) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on
meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
@@ -269,7 +271,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with
{}", ledgerName, cursorName, info);
}
- zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx,
stat1) -> executor.submit(safeRun(() -> {
+ zk.setData(path, content, zkStat.getVersion(),
+ (rc, path1, ctx, stat1) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(new
BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) {
@@ -285,7 +288,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
public void asyncRemoveCursor(final String ledgerName, final String
consumerName,
final MetaStoreCallback<Void> callback) {
log.info("[{}] Remove consumer={}", ledgerName, consumerName);
- zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path,
ctx) -> executor.submit(safeRun(() -> {
+ zk.delete(prefix + ledgerName + "/" + consumerName, -1,
+ (rc, path, ctx) -> executor.executeOrdered(ledgerName,
safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] zk delete done. rc={}", ledgerName,
consumerName, Code.get(rc));
}
@@ -300,7 +304,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void>
callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
- zk.delete(prefix + ledgerName, -1, (rc, path, ctx) ->
executor.submit(safeRun(() -> {
+ zk.delete(prefix + ledgerName, -1, (rc, path, ctx) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] zk delete done. rc={}", ledgerName,
Code.get(rc));
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 85c4da6..f4daec5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -123,14 +123,14 @@ class OpAddEntry extends SafeRunnable implements
AddCallback, CloseCallback {
// be marked as failed.
ml.mbean.recordAddEntryError();
- ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() ->
{
+ ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(()
-> {
// Force the creation of a new ledger. Doing it in a
background thread to avoid acquiring ML lock
// from a BK callback.
ml.ledgerClosed(lh);
}));
} else {
// Trigger addComplete callback in a thread hashed on the managed
ledger name
- ml.getExecutor().submitOrdered(ml.getName(), this);
+ ml.getExecutor().executeOrdered(ml.getName(), this);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 31d1c79..6b3a03a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -83,7 +83,7 @@ class OpReadEntry implements ReadEntriesCallback {
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can
return them
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
@@ -131,7 +131,7 @@ class OpReadEntry implements ReadEntriesCallback {
}
// Schedule next read in a different thread
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition =
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
@@ -139,7 +139,7 @@ class OpReadEntry implements ReadEntriesCallback {
// The reading was already completed, release resources and
trigger callback
cursor.readOperationCompleted();
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 107abcf..47c515d 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -24,9 +24,9 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -41,7 +41,7 @@ public class EntryCacheManagerTest extends
MockedBookKeeperTestCase {
@BeforeClass
void setup() throws Exception {
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ OrderedScheduler executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
ml1 = mock(ManagedLedgerImpl.class);
when(ml1.getScheduledExecutor()).thenReturn(executor);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
index c3e75c2..ebaa1fd 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
@@ -65,7 +65,7 @@ public class ManagedLedgerMBeanTest extends
MockedBookKeeperTestCase {
mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS);
// Simulate stats getting update from different thread
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -90,7 +90,7 @@ public class ManagedLedgerMBeanTest extends
MockedBookKeeperTestCase {
ledger.addEntry(new byte[600]);
cursor.markDelete(p1);
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -109,7 +109,7 @@ public class ManagedLedgerMBeanTest extends
MockedBookKeeperTestCase {
mbean.recordAddEntryError();
mbean.recordReadEntriesError();
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -119,7 +119,7 @@ public class ManagedLedgerMBeanTest extends
MockedBookKeeperTestCase {
List<Entry> entries = cursor.readEntries(100);
assertEquals(entries.size(), 1);
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 58010e8..47753e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -106,8 +107,8 @@ public class PulsarService implements AutoCloseable {
new DefaultThreadFactory("pulsar"));
private final ScheduledExecutorService cacheExecutor =
Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
- private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
- .name("pulsar-ordered").build();
+ private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered")
+ .build();
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
@@ -445,7 +446,7 @@ public class PulsarService implements AutoCloseable {
LOG.info("starting configuration cache service");
- this.localZkCache = new LocalZooKeeperCache(getZkClient(),
getOrderedExecutor(), this.cacheExecutor);
+ this.localZkCache = new LocalZooKeeperCache(getZkClient(),
getOrderedExecutor());
this.globalZkCache = new
GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.cacheExecutor);
@@ -611,7 +612,7 @@ public class PulsarService implements AutoCloseable {
return loadManagerExecutor;
}
- public OrderedScheduler getOrderedExecutor() {
+ public OrderedExecutor getOrderedExecutor() {
return orderedExecutor;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fe30713..2668f95 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -636,8 +636,8 @@ public class NamespaceService {
if (t != null) {
// retry several times on BadVersion
if ((t instanceof ServerMetadataException) &&
(counter.decrementAndGet() >= 0)) {
- pulsar.getOrderedExecutor().submit(
- () -> splitAndOwnBundleOnceAndRetry(bundle, unload,
counter, unloadFuture));
+ pulsar.getOrderedExecutor()
+ .execute(() ->
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update nsBundles,
counter %d, reason %s",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e1a0d3f..6c2e011 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -476,7 +476,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
replicationFuture.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics
list {}, {}", topic, ex);
nonPersistentTopic.stopReplProducers().whenComplete((v, exception)
-> {
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(ex);
});
@@ -579,7 +579,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot
add topic %s", topic);
log.warn(msg);
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg));
return;
}
@@ -618,7 +618,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
});
} catch (NamingException e) {
log.warn("Failed to create topic {}-{}",
topic, e.getMessage());
- pulsar.getExecutor().submit(() ->
topics.remove(topic, topicFuture));
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
}
@@ -626,7 +626,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
@Override
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
log.warn("Failed to create topic {}", topic,
exception);
- pulsar.getExecutor().submit(() ->
topics.remove(topic, topicFuture));
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
@@ -635,7 +635,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
log.warn("[{}] Failed to get topic configuration: {}", topic,
exception.getMessage(), exception);
// remove topic from topics-map in different thread to avoid
possible deadlock if
// createPersistentTopic-thread only tries to handle this
future-result
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(exception);
return null;
});
@@ -644,7 +644,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(TopicName topicName) {
CompletableFuture<ManagedLedgerConfig> future = new
CompletableFuture<>();
// Execute in background thread, since getting the policies might
block if the z-node wasn't already cached
- pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> {
+ pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> {
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
@@ -1104,7 +1104,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
private void updateTopicMessageDispatchRate() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
@@ -1150,7 +1150,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
private void updateManagedLedgerConfig() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update managed-ledger config of each topic
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
@@ -1403,7 +1403,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
// block dispatcher with higher unack-msg when it reaches
broker-unack msg limit
log.info("[{}] Starting blocking dispatchers with unacked msgs {}
due to reached max broker limit {}",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
- executor().submit(() -> blockDispatchersWithLargeUnAckMessages());
+ executor().execute(() -> blockDispatchersWithLargeUnAckMessages());
} else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages
< maxUnackedMessages / 2) {
// unblock broker-dispatching if received enough acked messages
back
if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false))
{
@@ -1457,7 +1457,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
- executor().submit(() -> dispatcher.readMoreEntries());
+ executor().execute(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bada69c..c052e4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -955,7 +955,7 @@ public class ServerCnx extends PulsarHandler {
if (nonPersistentPendingMessages >
MaxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
-
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(),
SafeRun.safeRun(() -> {
+
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(),
SafeRun.safeRun(() -> {
ctx.writeAndFlush(Commands.newSendReceipt(producerId,
sequenceId, -1, -1), ctx.voidPromise());
}));
producer.recordMessageDrop(send.getNumMessages());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fc6a920..9eb7ce7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -27,8 +27,10 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
@@ -190,7 +193,7 @@ public class NonPersistentTopic implements Topic {
// retain data for sub/replication because io-thread will release
actual payload
data.retain(2);
- this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+ this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
subscriptions.forEach((name, subscription) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
@@ -210,7 +213,7 @@ public class NonPersistentTopic implements Topic {
}
}));
- this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+ this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
@@ -487,7 +490,7 @@ public class NonPersistentTopic implements Topic {
FutureUtil.waitForAll(futures).thenRun(() -> {
log.info("[{}] Topic closed", topic);
- brokerService.pulsar().getExecutor().submit(() ->
brokerService.removeTopicFromCache(topic));
+ brokerService.pulsar().getExecutor().execute(() ->
brokerService.removeTopicFromCache(topic));
closeFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e7e50b8..443d29b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -152,7 +152,7 @@ public class MessageDeduplication {
if (managedCursor.hasMoreEntries()) {
// Read next batch of entries
- pulsar.getExecutor().submit(() -> replayCursor(future));
+ pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
future.complete(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2eb83ae..17c5db7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMu
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
- topic.getBrokerService().executor().submit(() ->
readMoreEntries());
+ topic.getBrokerService().executor().execute(() ->
readMoreEntries());
}
}
// increment broker-level count
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 98cafc5..2909857 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -18,30 +18,30 @@
*/
package org.apache.pulsar.broker.auth;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class SameThreadOrderedSafeExecutor extends OrderedScheduler {
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
public SameThreadOrderedSafeExecutor() {
super("same-thread-executor", 1, new DefaultThreadFactory("test"),
NullStatsLogger.INSTANCE, false, 100000, 10);
}
@Override
- public void submit(SafeRunnable r) {
+ public void execute(Runnable r) {
r.run();
}
@Override
- public void submitOrdered(int orderingKey, SafeRunnable r) {
+ public void executeOrdered(int orderingKey, SafeRunnable r) {
r.run();
}
@Override
- public void submitOrdered(long orderingKey, SafeRunnable r) {
+ public void executeOrdered(long orderingKey, SafeRunnable r) {
r.run();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 9fd2fb7..2fe63e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -25,8 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.hash.Hashing;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
@@ -43,8 +42,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.hash.Hashing;
-
public class ResourceQuotaCacheTest {
private PulsarService pulsar;
@@ -52,14 +49,12 @@ public class ResourceQuotaCacheTest {
private LocalZooKeeperCacheService localCache;
private NamespaceBundleFactory bundleFactory;
private OrderedScheduler executor;
- private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor, scheduledExecutor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
// set mock pulsar localzkcache
@@ -77,7 +72,6 @@ public class ResourceQuotaCacheTest {
@AfterMethod
public void teardown() {
executor.shutdown();
- scheduledExecutor.shutdown();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index e8d4bb9..dfca1b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -33,10 +33,10 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.hash.Hashing;
+
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -58,8 +58,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.hash.Hashing;
-
public class OwnershipCacheTest {
private PulsarService pulsar;
@@ -71,7 +69,6 @@ public class OwnershipCacheTest {
private NamespaceService nsService;
private BrokerService brokerService;
private OrderedScheduler executor;
- private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
@@ -80,8 +77,7 @@ public class OwnershipCacheTest {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- scheduledExecutor = Executors.newScheduledThreadPool(2);
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor, scheduledExecutor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache =
mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
@@ -106,7 +102,6 @@ public class OwnershipCacheTest {
@AfterMethod
public void teardown() throws Exception {
executor.shutdown();
- scheduledExecutor.shutdown();
}
@Test
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c468298..2cbd642 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -655,7 +655,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return;
}
- listenerExecutor.submit(() -> {
+ listenerExecutor.execute(() -> {
if (isActive) {
consumerEventListener.becameActive(this, partitionIndex);
} else {
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index a6e3cbc..9a4b9aa 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.naming.TopicName;
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 30f75be..1371d05 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker
list.
*
@@ -55,8 +51,6 @@ public class ZookeeperCacheLoader implements Closeable {
private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-discovery-ordered-cache").build();
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(8,
- new DefaultThreadFactory("pulsar-discovery-cache"));
public static final String LOADBALANCE_BROKERS_ROOT =
"/loadbalance/brokers";
@@ -74,8 +68,7 @@ public class ZookeeperCacheLoader implements Closeable {
log.error("Shutting down ZK sessions: {}", exitCode);
});
- this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor,
- executor);
+ this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
@@ -115,7 +108,6 @@ public class ZookeeperCacheLoader implements Closeable {
@Override
public void close() {
orderedExecutor.shutdown();
- executor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 0ba1ea6..27d5a9b 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@ import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker
list.
*
@@ -56,8 +52,6 @@ public class ZookeeperCacheLoader implements Closeable {
private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-discovery-ordered-cache").build();
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(8,
- new DefaultThreadFactory("pulsar-discovery-cache"));
public static final String LOADBALANCE_BROKERS_ROOT =
"/loadbalance/brokers";
@@ -75,8 +69,7 @@ public class ZookeeperCacheLoader implements Closeable {
log.error("Shutting down ZK sessions: {}", exitCode);
});
- this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor,
- executor);
+ this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
@@ -121,7 +114,6 @@ public class ZookeeperCacheLoader implements Closeable {
@Override
public void close() {
orderedExecutor.shutdown();
- executor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index bd75e82..b59cde5 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
@@ -51,8 +51,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache
implements Closeable {
private final ScheduledExecutorService scheduledExecutor;
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int
zkSessionTimeoutMillis,
- String globalZkConnect, OrderedScheduler orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
- super(null, orderedExecutor, scheduledExecutor);
+ String globalZkConnect, OrderedExecutor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
+ super(null, orderedExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 23547dd..945b58e 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.zookeeper;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -37,9 +35,8 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
private static final Logger LOG =
LoggerFactory.getLogger(LocalZooKeeperCache.class);
- public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler
executor,
- ScheduledExecutorService scheduledExecutor) {
- super(zk, executor, scheduledExecutor);
+ public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor
executor) {
+ super(zk, executor);
}
@Override
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index b2715b3..e2e46aa 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.zookeeper;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
import java.nio.file.Paths;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
@@ -29,14 +35,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -49,14 +53,6 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Per ZK client ZooKeeper cache supporting ZNode data and children list
caches. A cache entry is identified, accessed
* and invalidated by the ZNode path. For the data cache, ZNode data parsing
is done at request time with the given
@@ -84,19 +80,18 @@ public abstract class ZooKeeperCache implements Watcher {
protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final Cache<String, Set<String>> childrenCache;
protected final Cache<String, Boolean> existsCache;
- private final OrderedScheduler executor;
- private final ScheduledExecutorService scheduledExecutor;
- private boolean shouldShutdownExecutor = false;
+ private final OrderedExecutor executor;
+ private final OrderedExecutor backgroundExecutor =
OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
+ private boolean shouldShutdownExecutor;
public static final int cacheTimeOutInSec = 30;
protected AtomicReference<ZooKeeper> zkSession = new
AtomicReference<ZooKeeper>(null);
- public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor,
ScheduledExecutorService scheduledExecutor) {
+ public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
checkNotNull(executor);
- checkNotNull(scheduledExecutor);
this.executor = executor;
- this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);
+ this.shouldShutdownExecutor = false;
this.dataCache = Caffeine.newBuilder().expireAfterAccess(1,
TimeUnit.HOURS)
.buildAsync((key, executor1) -> null);
@@ -106,8 +101,7 @@ public abstract class ZooKeeperCache implements Watcher {
}
public ZooKeeperCache(ZooKeeper zkSession) {
- this(zkSession,
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(),
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("zk-cache-callback-executor")));
+ this(zkSession,
OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
this.shouldShutdownExecutor = true;
}
@@ -132,7 +126,7 @@ public abstract class ZooKeeperCache implements Watcher {
LOG.debug("Submitting reload cache task to the executor
for path: {}, updater: {}", path, updater);
}
try {
- executor.submitOrdered(path, new SafeRunnable() {
+ executor.executeOrdered(path, new SafeRunnable() {
@Override
public void safeRun() {
updater.reloadCache(path);
@@ -181,7 +175,7 @@ public abstract class ZooKeeperCache implements Watcher {
}
public void asyncInvalidate(String path) {
- scheduledExecutor.submit(() -> invalidate(path));
+ backgroundExecutor.execute(() -> invalidate(path));
}
public void invalidate(final String path) {
@@ -322,20 +316,20 @@ public abstract class ZooKeeperCache implements Watcher {
// Broker doesn't restart on global-zk session lost: so handling
unexpected exception
try {
this.zkSession.get().getData(path, watcher, (rc, path1, ctx,
content, stat) -> {
- Executor exec = scheduledExecutor != null ?
scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the
result
- exec.execute(() -> zkFuture.complete(new
SimpleImmutableEntry<Object, Stat>(obj, stat)));
+ executor.execute(
+ () -> zkFuture.complete(new
SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
- exec.execute(() ->
zkFuture.completeExceptionally(e));
+ executor.execute(() ->
zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is
not "exceptional" condition
- exec.execute(() -> zkFuture.complete(null));
+ executor.execute(() -> zkFuture.complete(null));
} else {
- exec.execute(() ->
zkFuture.completeExceptionally(KeeperException.create(rc)));
+ executor.execute(() ->
zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);
} catch (Exception e) {
@@ -430,7 +424,8 @@ public abstract class ZooKeeperCache implements Watcher {
public void stop() {
if (shouldShutdownExecutor) {
this.executor.shutdown();
- this.scheduledExecutor.shutdown();
}
+
+ this.backgroundExecutor.shutdown();
}
}
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index c0f5299..d18aea9 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,21 +23,21 @@ import static
org.apache.bookkeeper.util.SafeRunnable.safeRun;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
- private final OrderedScheduler executor;
+ private final OrderedExecutor executor;
- public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) {
+ public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
this.executor = executor;
}
@@ -45,7 +45,7 @@ public class ZookeeperBkClientFactoryImpl implements
ZooKeeperClientFactory {
public CompletableFuture<ZooKeeper> create(String serverList, SessionType
sessionType, int zkSessionTimeoutMillis) {
CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
- executor.submit(safeRun(() -> {
+ executor.execute(safeRun(() -> {
try {
ZooKeeper zk =
ZooKeeperClient.newBuilder().connectString(serverList)
.sessionTimeoutMs(zkSessionTimeoutMillis)
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e9bad89..af00abc 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -87,7 +87,7 @@ public class ZookeeperCacheTest {
@Test(timeOut = 10000)
void testSimpleCache() throws Exception {
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws
Exception {
@@ -127,7 +127,7 @@ public class ZookeeperCacheTest {
void testChildrenCache() throws Exception {
zkClient.create("/test", new byte[0], null, null);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperChildrenCache cache = new
ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
@@ -180,7 +180,7 @@ public class ZookeeperCacheTest {
// Check existence after creation of the node
zkClient.create("/test", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -197,7 +197,7 @@ public class ZookeeperCacheTest {
zkClient.create("/test/c1", new byte[0], null, null);
zkClient.create("/test/c2", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -339,7 +339,7 @@ public class ZookeeperCacheTest {
// add readOpDelayMs so, main thread will not serve zkCacahe-returned
future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws
Exception {
@@ -387,7 +387,7 @@ public class ZookeeperCacheTest {
// add readOpDelayMs so, main thread will not serve zkCacahe-returned
future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
final AtomicInteger count = new AtomicInteger(0);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
--
To stop receiving notification emails like this one, please contact
[email protected].