This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dba8313 Fallback to use v3 protocol for some types of requests if
they are not implemented in v2
dba8313 is described below
commit dba831375ad0f10821c04258fb08f70cce3f4b67
Author: Yong Zhang <[email protected]>
AuthorDate: Mon May 20 10:51:58 2019 +0800
Fallback to use v3 protocol for some types of requests if they are not
implemented in v2
Descriptions of the changes in this PR:
### Motivation
#2071
### Changes
- Add a client pool use v3 wire protocol
- Obtain client by version
- Currently only support `writeLac` and `readLac`
Master Issue: #2071
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #2085 from zymap/compatible_protocol
---
.../apache/bookkeeper/proto/BookieClientImpl.java | 21 +++++---
.../proto/DefaultPerChannelBookieClientPool.java | 62 +++++++++++++++++-----
.../proto/PerChannelBookieClientFactory.java | 5 +-
.../proto/PerChannelBookieClientPool.java | 10 ++++
.../apache/bookkeeper/client/ExplicitLacTest.java | 20 +++++++
5 files changed, 98 insertions(+), 20 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index c772a97..9cd52e1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -93,6 +93,8 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
private final ExtensionRegistry registry;
private final ClientConfiguration conf;
+ private final ClientConfiguration v3Conf;
+ private final boolean useV3Enforced;
private volatile boolean closed;
private final ReentrantReadWriteLock closeLock;
private final StatsLogger statsLogger;
@@ -105,6 +107,9 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
OrderedExecutor executor, ScheduledExecutorService
scheduler,
StatsLogger statsLogger) throws IOException {
this.conf = conf;
+ this.v3Conf = new ClientConfiguration(conf);
+ this.v3Conf.setUseV2WireProtocol(false);
+ this.useV3Enforced = conf.getUseV2WireProtocol();
this.eventLoopGroup = eventLoopGroup;
this.allocator = allocator;
this.executor = executor;
@@ -179,13 +184,17 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
@Override
public PerChannelBookieClient create(BookieSocketAddress address,
PerChannelBookieClientPool pcbcPool,
- SecurityHandlerFactory shFactory) throws SecurityException {
+ SecurityHandlerFactory shFactory, boolean forceUseV3) throws
SecurityException {
StatsLogger statsLoggerForPCBC = statsLogger;
if (conf.getLimitStatsLogging()) {
statsLoggerForPCBC = NullStatsLogger.INSTANCE;
}
- return new PerChannelBookieClient(conf, executor, eventLoopGroup,
allocator, address, statsLoggerForPCBC,
- authProviderFactory, registry, pcbcPool, shFactory);
+ ClientConfiguration clientConfiguration = conf;
+ if (forceUseV3) {
+ clientConfiguration = v3Conf;
+ }
+ return new PerChannelBookieClient(clientConfiguration, executor,
eventLoopGroup, allocator, address,
+ statsLoggerForPCBC, authProviderFactory,
registry, pcbcPool, shFactory);
}
public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -267,7 +276,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
}
toSend.release();
- }, ledgerId);
+ }, ledgerId, useV3Enforced);
}
private void completeAdd(final int rc,
@@ -467,7 +476,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
} else {
pcbc.readLac(ledgerId, cb, ctx);
}
- }, ledgerId);
+ }, ledgerId, useV3Enforced);
}
public void readEntry(BookieSocketAddress addr, long ledgerId, long
entryId,
@@ -546,7 +555,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
} else {
pcbc.getBookieInfo(requested, cb, ctx);
}
- }, requested);
+ }, requested, useV3Enforced);
}
private void monitorPendingOperations() {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 04471d5..71193de 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -48,6 +48,7 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
final BookieSocketAddress address;
final PerChannelBookieClient[] clients;
+ final PerChannelBookieClient[] clientsV3Enforced;
final ClientConfiguration conf;
SecurityHandlerFactory shFactory;
@@ -63,12 +64,20 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
this.address = address;
this.conf = conf;
- this.shFactory = SecurityProviderFactoryFactory
- .getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
+ this.shFactory =
SecurityProviderFactoryFactory.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
this.clients = new PerChannelBookieClient[coreSize];
for (int i = 0; i < coreSize; i++) {
- this.clients[i] = factory.create(address, this, shFactory);
+ this.clients[i] = factory.create(address, this, shFactory, false);
+ }
+
+ if (conf.getUseV2WireProtocol()) {
+ this.clientsV3Enforced = new PerChannelBookieClient[coreSize];
+ for (int i = 0; i < coreSize; i++) {
+ this.clientsV3Enforced[i] = factory.create(address, this,
shFactory, true);
+ }
+ } else {
+ this.clientsV3Enforced = this.clients;
}
}
@@ -85,16 +94,31 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
}
private PerChannelBookieClient getClient(long key) {
- if (1 == clients.length) {
- return clients[0];
+ return getClient(key, false);
+ }
+
+ private PerChannelBookieClient getClient(long key,
PerChannelBookieClient[] pcbc) {
+ if (1 == pcbc.length) {
+ return pcbc[0];
}
- int idx = MathUtils.signSafeMod(key, clients.length);
- return clients[idx];
+ int idx = MathUtils.signSafeMod(key, pcbc.length);
+ return pcbc[idx];
+ }
+ private PerChannelBookieClient getClient(long key, boolean forceUseV3) {
+ if (forceUseV3) {
+ return getClient(key, clientsV3Enforced);
+ }
+ return getClient(key, clients);
}
@Override
public void obtain(GenericCallback<PerChannelBookieClient> callback, long
key) {
- getClient(key).connectIfNeededAndDoOp(callback);
+ obtain(callback, key, false);
+ }
+
+ @Override
+ public void obtain(GenericCallback<PerChannelBookieClient> callback, long
key, boolean forceUseV3) {
+ getClient(key, forceUseV3).connectIfNeededAndDoOp(callback);
}
@Override
@@ -106,6 +130,9 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
public void checkTimeoutOnPendingOperations() {
for (int i = 0; i < clients.length; i++) {
clients[i].checkTimeoutOnPendingOperations();
+ if (clients != clientsV3Enforced) {
+ clientsV3Enforced[i].checkTimeoutOnPendingOperations();
+ }
}
}
@@ -116,15 +143,21 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
@Override
public void disconnect(boolean wait) {
- for (PerChannelBookieClient pcbc : clients) {
- pcbc.disconnect(wait);
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].disconnect();
+ if (clients != clientsV3Enforced) {
+ clientsV3Enforced[i].disconnect();
+ }
}
}
@Override
public void close(boolean wait) {
- for (PerChannelBookieClient pcbc : clients) {
- pcbc.close(wait);
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].close(wait);
+ if (clients != clientsV3Enforced) {
+ clientsV3Enforced[i].close(wait);
+ }
}
}
@@ -134,6 +167,11 @@ class DefaultPerChannelBookieClientPool implements
PerChannelBookieClientPool,
for (PerChannelBookieClient pcbc : clients) {
numPending += pcbc.getNumPendingCompletionRequests();
}
+ if (clients != clientsV3Enforced) {
+ for (PerChannelBookieClient pcbc : clientsV3Enforced) {
+ numPending += pcbc.getNumPendingCompletionRequests();
+ }
+ }
return numPending;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
index 17abb56..48797cd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
@@ -36,6 +36,7 @@ interface PerChannelBookieClientFactory {
* @return the client connected to address.
* @throws SecurityException
*/
- PerChannelBookieClient create(BookieSocketAddress address,
- PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory
shFactory) throws SecurityException;
+ PerChannelBookieClient create(BookieSocketAddress address,
PerChannelBookieClientPool pcbcPool,
+ SecurityHandlerFactory shFactory,
+ boolean forceUseV3) throws SecurityException;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index aa7a5e9..c7bc005 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,6 +41,16 @@ public interface PerChannelBookieClientPool {
void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
/**
+ * Obtain a channel from channel pool by version to execute operations.
+ *
+ * @param callback
+ * callback to return channel from channel pool
+ * @param forceUseV3
+ * whether or not use v3 protocol for connection
+ */
+ void obtain(GenericCallback<PerChannelBookieClient> callback, long key,
boolean forceUseV3);
+
+ /**
* Returns status of a client.
* It is suggested to delay/throttle requests to this channel if
isWritable is false.
*
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
index 5c5c24b..f92cbce 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -340,5 +340,25 @@ public class ExplicitLacTest extends
BookKeeperClusterTestCase {
bkcWithExplicitLAC.close();
}
+ @Test
+ public void fallbackV3() throws Exception {
+ ClientConfiguration v2Conf = new ClientConfiguration();
+ v2Conf.setUseV2WireProtocol(true);
+ v2Conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ v2Conf.setExplictLacInterval(10);
+
+ BookKeeper bookKeeper = new BookKeeper(v2Conf);
+ LedgerHandle write = (LedgerHandle) bookKeeper.createLedger(1,
+ 1,
+ 1,
+
DigestType.MAC,
+
"pass".getBytes());
+ write.addEntry("test".getBytes());
+ TestUtils.waitUntilExplicitLacUpdated(write, 0);
+ long lac = write.readExplicitLastConfirmed();
+ assertEquals(0, lac);
+ write.close();
+ bookKeeper.close();
+ }
}