merlimat closed pull request #1489: Refactored to reflect changes in BK for
OrderedExecutor
URL: https://github.com/apache/incubator-pulsar/pull/1489
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index f267a6c06..60ba634e1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -188,7 +188,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl
position, final ReadEnt
manager.mlFactoryMBean.recordCacheMiss(1,
returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
- ml.getExecutor().submitOrdered(ml.getName(), safeRun(() ->
{
+ ml.getExecutor().executeOrdered(ml.getName(), safeRun(()
-> {
callback.readEntryComplete(returnEntry, obj);
}));
} else {
@@ -254,7 +254,7 @@ public void asyncReadEntry(LedgerHandle lh, long
firstEntry, long lastEntry, boo
checkNotNull(ml.getName());
checkNotNull(ml.getExecutor());
- ml.getExecutor().submitOrdered(ml.getName(), safeRun(() -> {
+ ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
// We got the entries, we need to transform them to a
List<> type
long totalSize = 0;
final List<EntryImpl> entriesToReturn =
Lists.newArrayListWithExpectedSize(entriesToRead);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index 7faa18c8e..262cbeb4c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -109,7 +109,7 @@ boolean hasSpaceInCache() {
// Trigger a single eviction in background. While the eviction is
running we stop inserting entries in the cache
if (currentSize > evictionTriggerThreshold &&
evictionInProgress.compareAndSet(false, true)) {
- mlFactory.executor.execute(safeRun(() -> {
+ mlFactory.scheduledExecutor.execute(safeRun(() -> {
// Trigger a new cache eviction cycle to bring the used memory
below the cacheEvictionWatermark
// percentage limit
long sizeToEvict = currentSize - (long) (maxSize *
cacheEvictionWatermak);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 29fe0a724..a2be71d15 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -37,6 +37,7 @@
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
+
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
@@ -51,6 +52,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.BKException;
@@ -847,7 +849,7 @@ public void asyncResetCursor(Position newPos,
AsyncCallbacks.ResetCursorCallback
final PositionImpl newPosition = (PositionImpl) newPos;
// order trim and reset operations on a ledger
- ledger.getExecutor().submitOrdered(ledger.getName(), safeRun(() -> {
+ ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
if (ledger.isValidPosition(newPosition) ||
newPosition.equals(PositionImpl.earliest)
|| newPosition.equals(PositionImpl.latest)) {
internalResetCursor(newPosition, callback);
@@ -1923,7 +1925,7 @@ void createNewMetadataLedger(final VoidCallback callback)
{
bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(),
config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), config.getDigestType(),
config.getPassword(), (rc, lh, ctx) -> {
- ledger.getExecutor().submit(safeRun(() -> {
+ ledger.getExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor
{}: {}", ledger.getName(), name,
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 4c0d3d703..d148d1dda 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -23,7 +23,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
-import io.netty.util.concurrent.DefaultThreadFactory;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -31,12 +31,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -63,7 +63,6 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.util.Futures;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -77,9 +76,9 @@
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
- protected final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(16,
- new DefaultThreadFactory("bookkeeper-ml"));
- private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+ protected final OrderedScheduler scheduledExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(16)
+ .name("bookkeeper-ml-scheduler").build();
+ private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(16)
.name("bookkeeper-ml-workers").build();
protected final ManagedLedgerFactoryMBeanImpl mbean;
@@ -122,7 +121,7 @@ public ManagedLedgerFactoryImpl(ClientConfiguration
bkClientConfiguration, Manag
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0,
StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper
zooKeeper) throws Exception {
@@ -138,7 +137,7 @@ public ManagedLedgerFactoryImpl(BookKeeper bookKeeper,
ZooKeeper zooKeeper, Mana
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
- this.statsTask = executor.scheduleAtFixedRate(() -> refreshStats(), 0,
StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.statsTask = scheduledExecutor.scheduleAtFixedRate(() ->
refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
}
private synchronized void refreshStats() {
@@ -232,7 +231,7 @@ public void asyncOpen(final String name, final
ManagedLedgerConfig config, final
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new
CompletableFuture<>();
- final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
bookKeeper, store, config, executor,
+ final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
bookKeeper, store, config, scheduledExecutor,
orderedExecutor, name);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
@@ -305,7 +304,7 @@ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
}
}
- executor.shutdown();
+ scheduledExecutor.shutdown();
orderedExecutor.shutdown();
entryCacheManager.clear();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 34283ed0d..4aceb6ca3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -22,18 +22,26 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -45,6 +53,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -77,23 +86,13 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
@@ -190,8 +189,8 @@
AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class,
State.class, "state");
private volatile State state = null;
- private final ScheduledExecutorService scheduledExecutor;
- private final OrderedScheduler executor;
+ private final OrderedScheduler scheduledExecutor;
+ private final OrderedExecutor executor;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
@@ -204,7 +203,7 @@
// //////////////////////////////////////////////////////////////////////
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper
bookKeeper, MetaStore store,
- ManagedLedgerConfig config, ScheduledExecutorService
scheduledExecutor, OrderedScheduler orderedExecutor,
+ ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
OrderedExecutor orderedExecutor,
final String name) {
this.factory = factory;
this.bookKeeper = bookKeeper;
@@ -250,7 +249,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo,
Stat stat) {
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: ", name, id,
BKException.getMessage(rc));
@@ -338,7 +337,7 @@ public void operationFailed(MetaStoreException e) {
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(),
config.getWriteQuorumSize(), config.getAckQuorumSize(),
config.getDigestType(), config.getPassword(), (rc, lh, ctx) ->
{
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
@@ -1319,7 +1318,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
mbean.startDataLedgerOpenOp();
bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(),
config.getPassword(),
(int rc, LedgerHandle lh, Object ctx) -> {
- executor.submit(safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give
chance to reopen it later
@@ -1484,12 +1483,12 @@ void notifyCursors() {
break;
}
- executor.submit(safeRun(() ->
waitingCursor.notifyEntriesAvailable()));
+ executor.execute(safeRun(() ->
waitingCursor.notifyEntriesAvailable()));
}
}
private void trimConsumedLedgersInBackground() {
- executor.submitOrdered(name, safeRun(() -> {
+ executor.executeOrdered(name, safeRun(() -> {
internalTrimConsumedLedgers();
}));
}
@@ -2113,11 +2112,11 @@ private boolean currentLedgerIsFull() {
return ledgers;
}
- ScheduledExecutorService getScheduledExecutor() {
+ OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
}
- OrderedScheduler getExecutor() {
+ OrderedExecutor getExecutor() {
return executor;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 7a7697531..31705c1b6 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -20,11 +20,16 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.base.Charsets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -40,11 +45,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-import com.google.protobuf.TextFormat.ParseException;
-
@SuppressWarnings("checkstyle:javadoctype")
public class MetaStoreImplZookeeper implements MetaStore {
@@ -55,7 +55,7 @@
private static final String prefix = prefixName + "/";
private final ZooKeeper zk;
- private final OrderedScheduler executor;
+ private final OrderedExecutor executor;
private static class ZKStat implements Stat {
private final int version;
@@ -90,7 +90,7 @@ public long getModificationTimestamp() {
}
}
- public MetaStoreImplZookeeper(ZooKeeper zk, OrderedScheduler executor)
+ public MetaStoreImplZookeeper(ZooKeeper zk, OrderedExecutor executor)
throws Exception {
this.zk = zk;
this.executor = executor;
@@ -130,7 +130,8 @@ private ManagedLedgerInfo
updateMLInfoTimestamp(ManagedLedgerInfo info) {
@Override
public void getManagedLedgerInfo(final String ledgerName, final
MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
- zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat)
-> executor.submit(safeRun(() -> {
+ zk.getData(prefix + ledgerName, false,
+ (rc, path, ctx, readData, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
@@ -171,7 +172,7 @@ public void asyncUpdateLedgerIds(String ledgerName,
ManagedLedgerInfo mlInfo, St
byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
- (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
+ (rc, path, zkCtx, stat1) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult
rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion()
: "null");
@@ -195,7 +196,8 @@ public void getCursors(final String ledgerName, final
MetaStoreCallback<List<Str
if (log.isDebugEnabled()) {
log.debug("[{}] Get cursors list", ledgerName);
}
- zk.getChildren(prefix + ledgerName, false, (rc, path, ctx, children,
stat) -> executor.submit(safeRun(() -> {
+ zk.getChildren(prefix + ledgerName, false,
+ (rc, path, ctx, children, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] getConsumers complete rc={} children={}",
ledgerName, Code.get(rc), children);
}
@@ -219,7 +221,7 @@ public void asyncGetCursorInfo(String ledgerName, String
consumerName,
log.debug("Reading from {}", path);
}
- zk.getData(path, false, (rc, path1, ctx, data, stat) ->
executor.submit(safeRun(() -> {
+ zk.getData(path, false, (rc, path1, ctx, data, stat) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc != Code.OK.intValue()) {
callback.operationFailed(new
MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
@@ -251,7 +253,7 @@ public void asyncUpdateCursorInfo(final String ledgerName,
final String cursorNa
log.debug("[{}] Creating consumer {} on meta-data store with
{}", ledgerName, cursorName, info);
}
zk.create(path, content, Acl, CreateMode.PERSISTENT,
- (rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
+ (rc, path1, ctx, name) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on
meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
@@ -269,7 +271,8 @@ public void asyncUpdateCursorInfo(final String ledgerName,
final String cursorNa
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with
{}", ledgerName, cursorName, info);
}
- zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx,
stat1) -> executor.submit(safeRun(() -> {
+ zk.setData(path, content, zkStat.getVersion(),
+ (rc, path1, ctx, stat1) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(new
BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) {
@@ -285,7 +288,8 @@ public void asyncUpdateCursorInfo(final String ledgerName,
final String cursorNa
public void asyncRemoveCursor(final String ledgerName, final String
consumerName,
final MetaStoreCallback<Void> callback) {
log.info("[{}] Remove consumer={}", ledgerName, consumerName);
- zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path,
ctx) -> executor.submit(safeRun(() -> {
+ zk.delete(prefix + ledgerName + "/" + consumerName, -1,
+ (rc, path, ctx) -> executor.executeOrdered(ledgerName,
safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] zk delete done. rc={}", ledgerName,
consumerName, Code.get(rc));
}
@@ -300,7 +304,7 @@ public void asyncRemoveCursor(final String ledgerName,
final String consumerName
@Override
public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void>
callback) {
log.info("[{}] Remove ManagedLedger", ledgerName);
- zk.delete(prefix + ledgerName, -1, (rc, path, ctx) ->
executor.submit(safeRun(() -> {
+ zk.delete(prefix + ledgerName, -1, (rc, path, ctx) ->
executor.executeOrdered(ledgerName, safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] zk delete done. rc={}", ledgerName,
Code.get(rc));
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 85c4da624..f4daec52e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -123,14 +123,14 @@ public void addComplete(int rc, final LedgerHandle lh,
long entryId, Object ctx)
// be marked as failed.
ml.mbean.recordAddEntryError();
- ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() ->
{
+ ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(()
-> {
// Force the creation of a new ledger. Doing it in a
background thread to avoid acquiring ML lock
// from a BK callback.
ml.ledgerClosed(lh);
}));
} else {
// Trigger addComplete callback in a thread hashed on the managed
ledger name
- ml.getExecutor().submitOrdered(ml.getName(), this);
+ ml.getExecutor().executeOrdered(ml.getName(), this);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 31d1c79b3..6b3a03adb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -83,7 +83,7 @@ public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can
return them
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
@@ -131,7 +131,7 @@ void checkReadCompletion() {
}
// Schedule next read in a different thread
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition =
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
@@ -139,7 +139,7 @@ void checkReadCompletion() {
// The reading was already completed, release resources and
trigger callback
cursor.readOperationCompleted();
- cursor.ledger.getExecutor().submit(safeRun(() -> {
+ cursor.ledger.getExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 107abcf75..47c515d13 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -24,9 +24,9 @@
import static org.testng.Assert.assertTrue;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -41,7 +41,7 @@
@BeforeClass
void setup() throws Exception {
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ OrderedScheduler executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).build();
ml1 = mock(ManagedLedgerImpl.class);
when(ml1.getScheduledExecutor()).thenReturn(executor);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
index c3e75c2e7..ebaa1fdab 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java
@@ -65,7 +65,7 @@ public void simple() throws Exception {
mbean.addLedgerSwitchLatencySample(1, TimeUnit.SECONDS);
// Simulate stats getting update from different thread
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -90,7 +90,7 @@ public void simple() throws Exception {
ledger.addEntry(new byte[600]);
cursor.markDelete(p1);
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -109,7 +109,7 @@ public void simple() throws Exception {
mbean.recordAddEntryError();
mbean.recordReadEntriesError();
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
@@ -119,7 +119,7 @@ public void simple() throws Exception {
List<Entry> entries = cursor.readEntries(100);
assertEquals(entries.size(), 1);
- factory.executor.submit(() -> {
+ factory.scheduledExecutor.submit(() -> {
mbean.refreshStats(1, TimeUnit.SECONDS);
}).get();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 58010e8d7..47753e6fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -38,6 +38,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -106,8 +107,8 @@
new DefaultThreadFactory("pulsar"));
private final ScheduledExecutorService cacheExecutor =
Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
- private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
- .name("pulsar-ordered").build();
+ private final OrderedExecutor orderedExecutor =
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered")
+ .build();
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
@@ -445,7 +446,7 @@ private void startZkCacheService() throws
PulsarServerException {
LOG.info("starting configuration cache service");
- this.localZkCache = new LocalZooKeeperCache(getZkClient(),
getOrderedExecutor(), this.cacheExecutor);
+ this.localZkCache = new LocalZooKeeperCache(getZkClient(),
getOrderedExecutor());
this.globalZkCache = new
GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.cacheExecutor);
@@ -611,7 +612,7 @@ public ScheduledExecutorService getLoadManagerExecutor() {
return loadManagerExecutor;
}
- public OrderedScheduler getOrderedExecutor() {
+ public OrderedExecutor getOrderedExecutor() {
return orderedExecutor;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fe307136f..2668f9540 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -636,8 +636,8 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
if (t != null) {
// retry several times on BadVersion
if ((t instanceof ServerMetadataException) &&
(counter.decrementAndGet() >= 0)) {
- pulsar.getOrderedExecutor().submit(
- () -> splitAndOwnBundleOnceAndRetry(bundle, unload,
counter, unloadFuture));
+ pulsar.getOrderedExecutor()
+ .execute(() ->
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update nsBundles,
counter %d, reason %s",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e1a0d3f0f..6c2e011ea 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -476,7 +476,7 @@ public void unloadNamespaceBundlesGracefully() {
replicationFuture.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics
list {}, {}", topic, ex);
nonPersistentTopic.stopReplProducers().whenComplete((v, exception)
-> {
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(ex);
});
@@ -579,7 +579,7 @@ private void createPersistentTopic(final String topic,
CompletableFuture<Topic>
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot
add topic %s", topic);
log.warn(msg);
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg));
return;
}
@@ -618,7 +618,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object
ctx) {
});
} catch (NamingException e) {
log.warn("Failed to create topic {}-{}",
topic, e.getMessage());
- pulsar.getExecutor().submit(() ->
topics.remove(topic, topicFuture));
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
}
@@ -626,7 +626,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object
ctx) {
@Override
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
log.warn("Failed to create topic {}", topic,
exception);
- pulsar.getExecutor().submit(() ->
topics.remove(topic, topicFuture));
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new
PersistenceException(exception));
}
}, null);
@@ -635,7 +635,7 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
log.warn("[{}] Failed to get topic configuration: {}", topic,
exception.getMessage(), exception);
// remove topic from topics-map in different thread to avoid
possible deadlock if
// createPersistentTopic-thread only tries to handle this
future-result
- pulsar.getExecutor().submit(() -> topics.remove(topic,
topicFuture));
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(exception);
return null;
});
@@ -644,7 +644,7 @@ public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
public CompletableFuture<ManagedLedgerConfig>
getManagedLedgerConfig(TopicName topicName) {
CompletableFuture<ManagedLedgerConfig> future = new
CompletableFuture<>();
// Execute in background thread, since getting the policies might
block if the z-node wasn't already cached
- pulsar.getOrderedExecutor().submitOrdered(topicName, safeRun(() -> {
+ pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> {
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
@@ -1104,7 +1104,7 @@ private void updateConfigurationAndRegisterListeners() {
}
private void updateTopicMessageDispatchRate() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
@@ -1150,7 +1150,7 @@ private void updateSubscriptionMessageDispatchRate() {
}
private void updateManagedLedgerConfig() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update managed-ledger config of each topic
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
@@ -1403,7 +1403,7 @@ public void checkUnAckMessageDispatching() {
// block dispatcher with higher unack-msg when it reaches
broker-unack msg limit
log.info("[{}] Starting blocking dispatchers with unacked msgs {}
due to reached max broker limit {}",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
- executor().submit(() -> blockDispatchersWithLargeUnAckMessages());
+ executor().execute(() -> blockDispatchersWithLargeUnAckMessages());
} else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages
< maxUnackedMessages / 2) {
// unblock broker-dispatching if received enough acked messages
back
if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false))
{
@@ -1457,7 +1457,7 @@ public void
unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
- executor().submit(() -> dispatcher.readMoreEntries());
+ executor().execute(() -> dispatcher.readMoreEntries());
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bada69ca2..c052e4c58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -955,7 +955,7 @@ protected void handleSend(CommandSend send, ByteBuf
headersAndPayload) {
if (nonPersistentPendingMessages >
MaxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
-
service.getTopicOrderedExecutor().submitOrdered(producer.getTopic().getName(),
SafeRun.safeRun(() -> {
+
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(),
SafeRun.safeRun(() -> {
ctx.writeAndFlush(Commands.newSendReceipt(producerId,
sequenceId, -1, -1), ctx.voidPromise());
}));
producer.recordMessageDrop(send.getNumMessages());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index fc6a920ef..9eb7ce7d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -27,8 +27,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
@@ -190,7 +193,7 @@ public void publishMessage(ByteBuf data, PublishContext
callback) {
// retain data for sub/replication because io-thread will release
actual payload
data.retain(2);
- this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+ this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
subscriptions.forEach((name, subscription) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
@@ -210,7 +213,7 @@ public void publishMessage(ByteBuf data, PublishContext
callback) {
}
}));
- this.executor.submitOrdered(topic, SafeRun.safeRun(() -> {
+ this.executor.executeOrdered(topic, SafeRun.safeRun(() -> {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
@@ -487,7 +490,7 @@ void removeSubscription(String subscriptionName) {
FutureUtil.waitForAll(futures).thenRun(() -> {
log.info("[{}] Topic closed", topic);
- brokerService.pulsar().getExecutor().submit(() ->
brokerService.removeTopicFromCache(topic));
+ brokerService.pulsar().getExecutor().execute(() ->
brokerService.removeTopicFromCache(topic));
closeFuture.complete(null);
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index e7e50b8cb..443d29bde 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -152,7 +152,7 @@ public void readEntriesComplete(List<Entry> entries, Object
ctx) {
if (managedCursor.hasMoreEntries()) {
// Read next batch of entries
- pulsar.getExecutor().submit(() -> replayCursor(future));
+ pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
future.complete(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2eb83ae29..17c5db7a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public void addUnAckedMessages(int numberOfMessages) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this,
TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
- topic.getBrokerService().executor().submit(() ->
readMoreEntries());
+ topic.getBrokerService().executor().execute(() ->
readMoreEntries());
}
}
// increment broker-level count
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
index 98cafc547..290985725 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java
@@ -18,30 +18,30 @@
*/
package org.apache.pulsar.broker.auth;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class SameThreadOrderedSafeExecutor extends OrderedScheduler {
+public class SameThreadOrderedSafeExecutor extends OrderedExecutor {
public SameThreadOrderedSafeExecutor() {
super("same-thread-executor", 1, new DefaultThreadFactory("test"),
NullStatsLogger.INSTANCE, false, 100000, 10);
}
@Override
- public void submit(SafeRunnable r) {
+ public void execute(Runnable r) {
r.run();
}
@Override
- public void submitOrdered(int orderingKey, SafeRunnable r) {
+ public void executeOrdered(int orderingKey, SafeRunnable r) {
r.run();
}
@Override
- public void submitOrdered(long orderingKey, SafeRunnable r) {
+ public void executeOrdered(long orderingKey, SafeRunnable r) {
r.run();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 9fd2fb7ac..2fe63e8ef 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -25,8 +25,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.hash.Hashing;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
@@ -43,8 +42,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.hash.Hashing;
-
public class ResourceQuotaCacheTest {
private PulsarService pulsar;
@@ -52,14 +49,12 @@
private LocalZooKeeperCacheService localCache;
private NamespaceBundleFactory bundleFactory;
private OrderedScheduler executor;
- private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor, scheduledExecutor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
// set mock pulsar localzkcache
@@ -77,7 +72,6 @@ public void setup() throws Exception {
@AfterMethod
public void teardown() {
executor.shutdown();
- scheduledExecutor.shutdown();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index e8d4bb909..dfca1b2c0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -33,10 +33,10 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.hash.Hashing;
+
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -58,8 +58,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.hash.Hashing;
-
public class OwnershipCacheTest {
private PulsarService pulsar;
@@ -71,7 +69,6 @@
private NamespaceService nsService;
private BrokerService brokerService;
private OrderedScheduler executor;
- private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
@@ -80,8 +77,7 @@ public void setup() throws Exception {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- scheduledExecutor = Executors.newScheduledThreadPool(2);
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor, scheduledExecutor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(),
executor);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache =
mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
@@ -106,7 +102,6 @@ public void setup() throws Exception {
@AfterMethod
public void teardown() throws Exception {
executor.shutdown();
- scheduledExecutor.shutdown();
}
@Test
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c468298ec..2cbd64224 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -655,7 +655,7 @@ void activeConsumerChanged(boolean isActive) {
return;
}
- listenerExecutor.submit(() -> {
+ listenerExecutor.execute(() -> {
if (isActive) {
consumerEventListener.becameActive(this, partitionIndex);
} else {
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index a6e3cbce2..9a4b9aaf2 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.naming.TopicName;
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 30f75be6b..1371d0580 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker
list.
*
@@ -55,8 +51,6 @@
private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-discovery-ordered-cache").build();
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(8,
- new DefaultThreadFactory("pulsar-discovery-cache"));
public static final String LOADBALANCE_BROKERS_ROOT =
"/loadbalance/brokers";
@@ -74,8 +68,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory
zkClientFactory, String zooke
log.error("Shutting down ZK sessions: {}", exitCode);
});
- this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor,
- executor);
+ this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
@@ -115,7 +108,6 @@ public ZooKeeperCache getLocalZkCache() {
@Override
public void close() {
orderedExecutor.shutdown();
- executor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 0ba1ea63c..27d5a9b74 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -22,8 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -37,8 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker
list.
*
@@ -56,8 +52,6 @@
private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-discovery-ordered-cache").build();
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(8,
- new DefaultThreadFactory("pulsar-discovery-cache"));
public static final String LOADBALANCE_BROKERS_ROOT =
"/loadbalance/brokers";
@@ -75,8 +69,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory
zkClientFactory, String zooke
log.error("Shutting down ZK sessions: {}", exitCode);
});
- this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor,
- executor);
+ this.localZkCache = new
LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
@@ -121,7 +114,6 @@ public ZooKeeperCache getLocalZkCache() {
@Override
public void close() {
orderedExecutor.shutdown();
- executor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index bd75e820e..b59cde569 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
@@ -51,8 +51,8 @@
private final ScheduledExecutorService scheduledExecutor;
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int
zkSessionTimeoutMillis,
- String globalZkConnect, OrderedScheduler orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
- super(null, orderedExecutor, scheduledExecutor);
+ String globalZkConnect, OrderedExecutor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
+ super(null, orderedExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 23547dd24..945b58e53 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.zookeeper;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -37,9 +35,8 @@
private static final Logger LOG =
LoggerFactory.getLogger(LocalZooKeeperCache.class);
- public LocalZooKeeperCache(final ZooKeeper zk, final OrderedScheduler
executor,
- ScheduledExecutorService scheduledExecutor) {
- super(zk, executor, scheduledExecutor);
+ public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor
executor) {
+ super(zk, executor);
}
@Override
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index b2715b3f2..e2e46aa8f 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -20,6 +20,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+
import java.nio.file.Paths;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
@@ -29,14 +35,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -49,14 +53,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Sets;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
/**
* Per ZK client ZooKeeper cache supporting ZNode data and children list
caches. A cache entry is identified, accessed
* and invalidated by the ZNode path. For the data cache, ZNode data parsing
is done at request time with the given
@@ -84,19 +80,18 @@
protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final Cache<String, Set<String>> childrenCache;
protected final Cache<String, Boolean> existsCache;
- private final OrderedScheduler executor;
- private final ScheduledExecutorService scheduledExecutor;
- private boolean shouldShutdownExecutor = false;
+ private final OrderedExecutor executor;
+ private final OrderedExecutor backgroundExecutor =
OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
+ private boolean shouldShutdownExecutor;
public static final int cacheTimeOutInSec = 30;
protected AtomicReference<ZooKeeper> zkSession = new
AtomicReference<ZooKeeper>(null);
- public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler executor,
ScheduledExecutorService scheduledExecutor) {
+ public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
checkNotNull(executor);
- checkNotNull(scheduledExecutor);
this.executor = executor;
- this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);
+ this.shouldShutdownExecutor = false;
this.dataCache = Caffeine.newBuilder().expireAfterAccess(1,
TimeUnit.HOURS)
.buildAsync((key, executor1) -> null);
@@ -106,8 +101,7 @@ public ZooKeeperCache(ZooKeeper zkSession, OrderedScheduler
executor, ScheduledE
}
public ZooKeeperCache(ZooKeeper zkSession) {
- this(zkSession,
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("zk-cache-executor").build(),
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("zk-cache-callback-executor")));
+ this(zkSession,
OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
this.shouldShutdownExecutor = true;
}
@@ -132,7 +126,7 @@ public ZooKeeper getZooKeeper() {
LOG.debug("Submitting reload cache task to the executor
for path: {}, updater: {}", path, updater);
}
try {
- executor.submitOrdered(path, new SafeRunnable() {
+ executor.executeOrdered(path, new SafeRunnable() {
@Override
public void safeRun() {
updater.reloadCache(path);
@@ -181,7 +175,7 @@ private void invalidateExists(String path) {
}
public void asyncInvalidate(String path) {
- scheduledExecutor.submit(() -> invalidate(path));
+ backgroundExecutor.execute(() -> invalidate(path));
}
public void invalidate(final String path) {
@@ -322,20 +316,20 @@ public Boolean call() throws Exception {
// Broker doesn't restart on global-zk session lost: so handling
unexpected exception
try {
this.zkSession.get().getData(path, watcher, (rc, path1, ctx,
content, stat) -> {
- Executor exec = scheduledExecutor != null ?
scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the
result
- exec.execute(() -> zkFuture.complete(new
SimpleImmutableEntry<Object, Stat>(obj, stat)));
+ executor.execute(
+ () -> zkFuture.complete(new
SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
- exec.execute(() ->
zkFuture.completeExceptionally(e));
+ executor.execute(() ->
zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is
not "exceptional" condition
- exec.execute(() -> zkFuture.complete(null));
+ executor.execute(() -> zkFuture.complete(null));
} else {
- exec.execute(() ->
zkFuture.completeExceptionally(KeeperException.create(rc)));
+ executor.execute(() ->
zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);
} catch (Exception e) {
@@ -430,7 +424,8 @@ public void invalidateRoot(String root) {
public void stop() {
if (shouldShutdownExecutor) {
this.executor.shutdown();
- this.scheduledExecutor.shutdown();
}
+
+ this.backgroundExecutor.shutdown();
}
}
diff --git
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
index c0f529999..d18aea93e 100644
---
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
+++
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImpl.java
@@ -23,21 +23,21 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
public class ZookeeperBkClientFactoryImpl implements ZooKeeperClientFactory {
- private final OrderedScheduler executor;
+ private final OrderedExecutor executor;
- public ZookeeperBkClientFactoryImpl(OrderedScheduler executor) {
+ public ZookeeperBkClientFactoryImpl(OrderedExecutor executor) {
this.executor = executor;
}
@@ -45,7 +45,7 @@ public ZookeeperBkClientFactoryImpl(OrderedScheduler
executor) {
public CompletableFuture<ZooKeeper> create(String serverList, SessionType
sessionType, int zkSessionTimeoutMillis) {
CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
- executor.submit(safeRun(() -> {
+ executor.execute(safeRun(() -> {
try {
ZooKeeper zk =
ZooKeeperClient.newBuilder().connectString(serverList)
.sessionTimeoutMs(zkSessionTimeoutMillis)
diff --git
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e9bad89f5..af00abc42 100644
---
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -87,7 +87,7 @@ void classTeardown() throws Exception {
@Test(timeOut = 10000)
void testSimpleCache() throws Exception {
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws
Exception {
@@ -127,7 +127,7 @@ public String deserialize(String key, byte[] content)
throws Exception {
void testChildrenCache() throws Exception {
zkClient.create("/test", new byte[0], null, null);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperChildrenCache cache = new
ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
@@ -180,7 +180,7 @@ void testExistsCache() throws Exception {
// Check existence after creation of the node
zkClient.create("/test", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -197,7 +197,7 @@ void testInvalidateCache() throws Exception {
zkClient.create("/test/c1", new byte[0], null, null);
zkClient.create("/test/c2", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -339,7 +339,7 @@ void testZkCallbackThreadStuck() throws Exception {
// add readOpDelayMs so, main thread will not serve zkCacahe-returned
future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws
Exception {
@@ -387,7 +387,7 @@ public void testInvalidateCacheOnFailure() throws Exception
{
// add readOpDelayMs so, main thread will not serve zkCacahe-returned
future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor, scheduledExecutor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient,
executor);
final AtomicInteger count = new AtomicInteger(0);
ZooKeeperDataCache<String> zkCache = new
ZooKeeperDataCache<String>(zkCacheService) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services