This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dba8313  Fallback to use v3 protocol for some types of requests if 
they are not implemented in v2
dba8313 is described below

commit dba831375ad0f10821c04258fb08f70cce3f4b67
Author: Yong Zhang <[email protected]>
AuthorDate: Mon May 20 10:51:58 2019 +0800

    Fallback to use v3 protocol for some types of requests if they are not 
implemented in v2
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    #2071
    
    ### Changes
    
    - Add a client pool use v3 wire protocol
    - Obtain client by version
    - Currently only support `writeLac` and `readLac`
    
    Master Issue: #2071
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #2085 from zymap/compatible_protocol
---
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 21 +++++---
 .../proto/DefaultPerChannelBookieClientPool.java   | 62 +++++++++++++++++-----
 .../proto/PerChannelBookieClientFactory.java       |  5 +-
 .../proto/PerChannelBookieClientPool.java          | 10 ++++
 .../apache/bookkeeper/client/ExplicitLacTest.java  | 20 +++++++
 5 files changed, 98 insertions(+), 20 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index c772a97..9cd52e1 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -93,6 +93,8 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
     private final ExtensionRegistry registry;
 
     private final ClientConfiguration conf;
+    private final ClientConfiguration v3Conf;
+    private final boolean useV3Enforced;
     private volatile boolean closed;
     private final ReentrantReadWriteLock closeLock;
     private final StatsLogger statsLogger;
@@ -105,6 +107,9 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
                             OrderedExecutor executor, ScheduledExecutorService 
scheduler,
                             StatsLogger statsLogger) throws IOException {
         this.conf = conf;
+        this.v3Conf = new ClientConfiguration(conf);
+        this.v3Conf.setUseV2WireProtocol(false);
+        this.useV3Enforced = conf.getUseV2WireProtocol();
         this.eventLoopGroup = eventLoopGroup;
         this.allocator = allocator;
         this.executor = executor;
@@ -179,13 +184,17 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
 
     @Override
     public PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool,
-            SecurityHandlerFactory shFactory) throws SecurityException {
+            SecurityHandlerFactory shFactory, boolean forceUseV3) throws 
SecurityException {
         StatsLogger statsLoggerForPCBC = statsLogger;
         if (conf.getLimitStatsLogging()) {
             statsLoggerForPCBC = NullStatsLogger.INSTANCE;
         }
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, 
allocator, address, statsLoggerForPCBC,
-                authProviderFactory, registry, pcbcPool, shFactory);
+        ClientConfiguration clientConfiguration = conf;
+        if (forceUseV3) {
+            clientConfiguration = v3Conf;
+        }
+        return new PerChannelBookieClient(clientConfiguration, executor, 
eventLoopGroup, allocator, address,
+                                   statsLoggerForPCBC, authProviderFactory, 
registry, pcbcPool, shFactory);
     }
 
     public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {
@@ -267,7 +276,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
             }
 
             toSend.release();
-        }, ledgerId);
+        }, ledgerId, useV3Enforced);
     }
 
     private void completeAdd(final int rc,
@@ -467,7 +476,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
             } else {
                 pcbc.readLac(ledgerId, cb, ctx);
             }
-        }, ledgerId);
+        }, ledgerId, useV3Enforced);
     }
 
     public void readEntry(BookieSocketAddress addr, long ledgerId, long 
entryId,
@@ -546,7 +555,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
             } else {
                 pcbc.getBookieInfo(requested, cb, ctx);
             }
-        }, requested);
+        }, requested, useV3Enforced);
     }
 
     private void monitorPendingOperations() {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
index 04471d5..71193de 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
@@ -48,6 +48,7 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     final BookieSocketAddress address;
 
     final PerChannelBookieClient[] clients;
+    final PerChannelBookieClient[] clientsV3Enforced;
 
     final ClientConfiguration conf;
     SecurityHandlerFactory shFactory;
@@ -63,12 +64,20 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
         this.address = address;
         this.conf = conf;
 
-        this.shFactory = SecurityProviderFactoryFactory
-                .getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
+        this.shFactory = 
SecurityProviderFactoryFactory.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
 
         this.clients = new PerChannelBookieClient[coreSize];
         for (int i = 0; i < coreSize; i++) {
-            this.clients[i] = factory.create(address, this, shFactory);
+            this.clients[i] = factory.create(address, this, shFactory, false);
+        }
+
+        if (conf.getUseV2WireProtocol()) {
+            this.clientsV3Enforced = new PerChannelBookieClient[coreSize];
+            for (int i = 0; i < coreSize; i++) {
+                this.clientsV3Enforced[i] = factory.create(address, this, 
shFactory, true);
+            }
+        } else {
+            this.clientsV3Enforced = this.clients;
         }
     }
 
@@ -85,16 +94,31 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     }
 
     private PerChannelBookieClient getClient(long key) {
-        if (1 == clients.length) {
-            return clients[0];
+        return getClient(key, false);
+    }
+
+    private PerChannelBookieClient getClient(long key, 
PerChannelBookieClient[] pcbc) {
+        if (1 == pcbc.length) {
+            return pcbc[0];
         }
-        int idx = MathUtils.signSafeMod(key, clients.length);
-        return clients[idx];
+        int idx = MathUtils.signSafeMod(key, pcbc.length);
+        return pcbc[idx];
+    }
+    private PerChannelBookieClient getClient(long key, boolean forceUseV3) {
+        if (forceUseV3) {
+            return getClient(key, clientsV3Enforced);
+        }
+        return getClient(key, clients);
     }
 
     @Override
     public void obtain(GenericCallback<PerChannelBookieClient> callback, long 
key) {
-        getClient(key).connectIfNeededAndDoOp(callback);
+        obtain(callback, key, false);
+    }
+
+    @Override
+    public void obtain(GenericCallback<PerChannelBookieClient> callback, long 
key, boolean forceUseV3) {
+        getClient(key, forceUseV3).connectIfNeededAndDoOp(callback);
     }
 
     @Override
@@ -106,6 +130,9 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
     public void checkTimeoutOnPendingOperations() {
         for (int i = 0; i < clients.length; i++) {
             clients[i].checkTimeoutOnPendingOperations();
+            if (clients != clientsV3Enforced) {
+                clientsV3Enforced[i].checkTimeoutOnPendingOperations();
+            }
         }
     }
 
@@ -116,15 +143,21 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
 
     @Override
     public void disconnect(boolean wait) {
-        for (PerChannelBookieClient pcbc : clients) {
-            pcbc.disconnect(wait);
+        for (int i = 0; i < clients.length; i++) {
+            clients[i].disconnect();
+            if (clients != clientsV3Enforced) {
+                clientsV3Enforced[i].disconnect();
+            }
         }
     }
 
     @Override
     public void close(boolean wait) {
-        for (PerChannelBookieClient pcbc : clients) {
-            pcbc.close(wait);
+        for (int i = 0; i < clients.length; i++) {
+            clients[i].close(wait);
+            if (clients != clientsV3Enforced) {
+                clientsV3Enforced[i].close(wait);
+            }
         }
     }
 
@@ -134,6 +167,11 @@ class DefaultPerChannelBookieClientPool implements 
PerChannelBookieClientPool,
         for (PerChannelBookieClient pcbc : clients) {
             numPending += pcbc.getNumPendingCompletionRequests();
         }
+        if (clients != clientsV3Enforced) {
+            for (PerChannelBookieClient pcbc : clientsV3Enforced) {
+                numPending += pcbc.getNumPendingCompletionRequests();
+            }
+        }
         return numPending;
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
index 17abb56..48797cd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientFactory.java
@@ -36,6 +36,7 @@ interface PerChannelBookieClientFactory {
      * @return the client connected to address.
      * @throws SecurityException
      */
-    PerChannelBookieClient create(BookieSocketAddress address,
-            PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory 
shFactory) throws SecurityException;
+    PerChannelBookieClient create(BookieSocketAddress address, 
PerChannelBookieClientPool pcbcPool,
+                                  SecurityHandlerFactory shFactory,
+                                  boolean forceUseV3) throws SecurityException;
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
index aa7a5e9..c7bc005 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
@@ -41,6 +41,16 @@ public interface PerChannelBookieClientPool {
     void obtain(GenericCallback<PerChannelBookieClient> callback, long key);
 
     /**
+     * Obtain a channel from channel pool by version to execute operations.
+     *
+     * @param callback
+     *          callback to return channel from channel pool
+     * @param forceUseV3
+     *          whether or not use v3 protocol for connection
+     */
+    void obtain(GenericCallback<PerChannelBookieClient> callback, long key, 
boolean forceUseV3);
+
+    /**
      * Returns status of a client.
      * It is suggested to delay/throttle requests to this channel if 
isWritable is false.
      *
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
index 5c5c24b..f92cbce 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ExplicitLacTest.java
@@ -340,5 +340,25 @@ public class ExplicitLacTest extends 
BookKeeperClusterTestCase {
         bkcWithExplicitLAC.close();
     }
 
+    @Test
+    public void fallbackV3() throws Exception {
+        ClientConfiguration v2Conf = new ClientConfiguration();
+        v2Conf.setUseV2WireProtocol(true);
+        v2Conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        v2Conf.setExplictLacInterval(10);
+
+        BookKeeper bookKeeper = new BookKeeper(v2Conf);
+        LedgerHandle write = (LedgerHandle) bookKeeper.createLedger(1,
+                                                                    1,
+                                                                    1,
+                                                                    
DigestType.MAC,
+                                                                    
"pass".getBytes());
+        write.addEntry("test".getBytes());
+        TestUtils.waitUntilExplicitLacUpdated(write, 0);
+        long lac = write.readExplicitLastConfirmed();
+        assertEquals(0, lac);
+        write.close();
+        bookKeeper.close();
+    }
 
 }

Reply via email to