This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch zk36 in repository https://gitbox.apache.org/repos/asf/curator.git
commit 59aedf55788a8a9a08026b8f740b2ec44c88d810 Author: randgalt <[email protected]> AuthorDate: Fri Oct 11 00:27:55 2019 +0300 wip --- .../org/apache/curator/utils/Compatibility.java | 11 +++ .../curator/utils/DefaultZookeeperFactory.java | 17 ++++ .../framework/imps/CuratorFrameworkImpl.java | 9 +++ .../imps/CuratorMultiTransactionRecord.java | 34 +++++--- .../framework/imps/ReconfigBuilderImpl.java | 5 +- .../recipes/leader/ChaosMonkeyCnxnFactory.java | 8 +- curator-test/pom.xml | 10 +++ .../org/apache/curator/test/Compatibility.java | 93 ++++++++++++++++++++++ .../apache/curator/test/TestingQuorumPeerMain.java | 2 +- .../apache/curator/test/TestingZooKeeperMain.java | 6 +- pom.xml | 21 ++++- 11 files changed, 193 insertions(+), 23 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java index 1ee2301..f9810fe 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java +++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java @@ -32,6 +32,7 @@ public class Compatibility { private static final boolean hasZooKeeperAdmin; private static final Method queueEventMethod; + private static final Logger logger = LoggerFactory.getLogger(Compatibility.class); static @@ -74,6 +75,16 @@ public class Compatibility } /** + * Return true if the ZooKeeperAdmin class is available + * + * @return true/false + */ + public static boolean hasZooKeeperAdmin() + { + return hasZooKeeperAdmin; + } + + /** * For ZooKeeper 3.5.x, use the supported <code>zooKeeper.getTestable().injectSessionExpiration()</code>. * For ZooKeeper 3.4.x do the equivalent via reflection * diff --git a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java index 42279d0..f936518 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DefaultZookeeperFactory.java @@ -20,12 +20,29 @@ package org.apache.curator.utils; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; public class DefaultZookeeperFactory implements ZookeeperFactory { + // hide org.apache.zookeeper.admin.ZooKeeperAdmin in a nested class so that Curator continues to work with ZK 3.4.x + private static class ZooKeeperAdminMaker implements ZookeeperFactory + { + static final ZooKeeperAdminMaker instance = new ZooKeeperAdminMaker(); + + @Override + public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception + { + return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly); + } + } + @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception { + if ( Compatibility.hasZooKeeperAdmin() ) + { + return ZooKeeperAdminMaker.instance.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); + } return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index e003bf0..bc59e77 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -620,6 +620,15 @@ public class CuratorFrameworkImpl implements CuratorFramework return client.getZooKeeper(); } + Object getZooKeeperAdmin() throws Exception + { + if ( isZk34CompatibilityMode() ) + { + Preconditions.checkState(!isZk34CompatibilityMode(), "getZooKeeperAdmin() is not supported when running in ZooKeeper 3.4 compatibility mode"); + } + return client.getZooKeeper(); + } + CompressionProvider getCompressionProvider() { return compressionProvider; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java index 3e72609..fbac6e6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java @@ -16,49 +16,57 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.curator.framework.imps; import com.google.common.collect.Lists; import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TypeAndPath; -import org.apache.zookeeper.MultiTransactionRecord; import org.apache.zookeeper.Op; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; -class CuratorMultiTransactionRecord extends MultiTransactionRecord +class CuratorMultiTransactionRecord implements Iterable<Op> { - private final List<TypeAndPath> metadata = Lists.newArrayList(); - - @Override - public final void add(Op op) - { - throw new UnsupportedOperationException(); - } + private final List<TypeAndPath> metadata = Lists.newArrayList(); + private List<Op> ops = new ArrayList<>(); void add(Op op, OperationType type, String forPath) { - super.add(op); + ops.add(op); metadata.add(new TypeAndPath(type, forPath)); } - TypeAndPath getMetadata(int index) + TypeAndPath getMetadata(int index) { return metadata.get(index); } - int metadataSize() + int metadataSize() { return metadata.size(); } void addToDigest(MessageDigest digest) { - for ( Op op : this ) + for ( Op op : ops ) { digest.update(op.getPath().getBytes()); digest.update(Integer.toString(op.getType()).getBytes()); digest.update(op.toRequestRecord().toString().getBytes()); } } + + @Override + public Iterator<Op> iterator() + { + return ops.iterator(); + } + + int size() + { + return ops.size(); + } } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 97be59a..9f129ca 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -24,6 +24,7 @@ import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.admin.ZooKeeperAdmin; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; import java.util.Arrays; @@ -268,7 +269,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation client.processBackgroundOperation(data, event); } }; - client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); + ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); } catch ( Throwable e ) { @@ -287,7 +288,7 @@ public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation @Override public byte[] call() throws Exception { - return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat); + return ((ZooKeeperAdmin)client.getZooKeeperAdmin()).reconfigure(joining, leaving, newMembers, fromConfig, responseStat); } } ); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java index 4cb342c..07e9a17 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java @@ -19,11 +19,11 @@ package org.apache.curator.framework.recipes.leader; +import org.apache.curator.test.Compatibility; import org.apache.curator.test.TestingZooKeeperMain; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.ByteBufferInputStream; -import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ZooKeeperServer; @@ -92,7 +92,7 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory log.debug("Rejected : " + si.toString()); // Still reject request log.debug("Still not ready for " + remaining + "ms"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); return; } // Submit the request to the legacy Zookeeper server @@ -113,13 +113,13 @@ public class ChaosMonkeyCnxnFactory extends NIOServerCnxnFactory firstError = System.currentTimeMillis(); // The znode has been created, close the connection and don't tell it to client log.warn("Closing connection right after " + createRequest.getPath() + " creation"); - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } catch ( Exception e ) { // Should not happen - ((NIOServerCnxn)si.cnxn).close(); + Compatibility.serverCnxnClose(si.cnxn); } } } diff --git a/curator-test/pom.xml b/curator-test/pom.xml index 3683b7d..4f0c5a2 100644 --- a/curator-test/pom.xml +++ b/curator-test/pom.xml @@ -41,6 +41,16 @@ </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java index 5b4b53f..1f8d181 100644 --- a/curator-test/src/main/java/org/apache/curator/test/Compatibility.java +++ b/curator-test/src/main/java/org/apache/curator/test/Compatibility.java @@ -18,10 +18,103 @@ */ package org.apache.curator.test; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerCnxnFactory; +import java.lang.reflect.Method; + +@SuppressWarnings("unchecked") public class Compatibility { + private static final Method closeAllWithReasonMethod; + private static final Method closeAllMethod; + private static final Method closeWithReasonMethod; + private static final Method closeMethod; + private static final Object disconnectReasonObj; + + static + { + Object localDisconnectReasonObj; + Method localCloseAllWithReasonMethod; + Method localCloseAllMethod; + Method localCloseWithReasonMethod; + Method localCloseMethod; + try + { + Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason"); + localDisconnectReasonObj = Enum.valueOf(disconnectReasonClass, "UNKNOWN"); + localCloseAllWithReasonMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll", disconnectReasonClass); + localCloseWithReasonMethod = ServerCnxn.class.getDeclaredMethod("close", disconnectReasonClass); + localCloseAllMethod = null; + localCloseMethod = null; + + localCloseAllWithReasonMethod.setAccessible(true); + localCloseWithReasonMethod.setAccessible(true); + } + catch ( Throwable e ) + { + localDisconnectReasonObj = null; + localCloseAllWithReasonMethod = null; + localCloseWithReasonMethod = null; + try + { + localCloseAllMethod = ServerCnxnFactory.class.getDeclaredMethod("closeAll"); + localCloseMethod = ServerCnxn.class.getDeclaredMethod("close"); + + localCloseAllMethod.setAccessible(true); + localCloseMethod.setAccessible(true); + } + catch ( Throwable ex ) + { + throw new IllegalStateException("Could not reflectively find ServerCnxnFactory/ServerCnxn close methods"); + } + } + disconnectReasonObj = localDisconnectReasonObj; + closeAllWithReasonMethod = localCloseAllWithReasonMethod; + closeAllMethod = localCloseAllMethod; + closeMethod = localCloseMethod; + closeWithReasonMethod = localCloseWithReasonMethod; + } + public static boolean isZK34() { return false; } + + public static void serverCnxnFactoryCloseAll(ServerCnxnFactory factory) + { + try + { + if ( closeAllMethod != null ) + { + closeAllMethod.invoke(factory); + } + else + { + closeAllWithReasonMethod.invoke(factory, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close factory", e); + } + } + + public static void serverCnxnClose(ServerCnxn cnxn) + { + try + { + if ( closeMethod != null ) + { + closeMethod.invoke(cnxn); + } + else + { + closeWithReasonMethod.invoke(cnxn, disconnectReasonObj); + } + } + catch ( Exception e ) + { + throw new RuntimeException("Could not close connection", e); + } + } } diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java index 3b3ab26..7489527 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java @@ -39,7 +39,7 @@ class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace Field cnxnFactoryField = QuorumPeer.class.getDeclaredField("cnxnFactory"); cnxnFactoryField.setAccessible(true); ServerCnxnFactory cnxnFactory = (ServerCnxnFactory)cnxnFactoryField.get(quorumPeer); - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java index 841df77..91f185f 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java @@ -81,7 +81,7 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { if ( cnxnFactory != null ) { - cnxnFactory.closeAll(); + Compatibility.serverCnxnFactoryCloseAll(cnxnFactory); Field ssField = cnxnFactory.getClass().getDeclaredField("ss"); ssField.setAccessible(true); @@ -262,7 +262,9 @@ public class TestingZooKeeperMain implements ZooKeeperMainFace { public TestZooKeeperServer(FileTxnSnapLog txnLog, ServerConfig config) { - super(txnLog, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), null); + this.setTxnLogFactory(txnLog); + this.setMinSessionTimeout(config.getMinSessionTimeout()); + this.setMaxSessionTimeout(config.getMaxSessionTimeout()); } private final AtomicBoolean isRunning = new AtomicBoolean(false); diff --git a/pom.xml b/pom.xml index 3a5e152..2ce0de1 100644 --- a/pom.xml +++ b/pom.xml @@ -85,10 +85,11 @@ <guava-failureaccess-version>1.0.1</guava-failureaccess-version> <testng-version>6.14.3</testng-version> <swift-version>0.23.1</swift-version> - <dropwizard-version>1.3.7</dropwizard-version> <maven-shade-plugin-version>3.2.1</maven-shade-plugin-version> <slf4j-version>1.7.25</slf4j-version> <clirr-maven-plugin-version>2.8</clirr-maven-plugin-version> + <dropwizard-version>3.2.5</dropwizard-version> + <snappy-version>1.1.7</snappy-version> <!-- OSGi Properties --> <osgi.export.package /> @@ -567,6 +568,24 @@ <artifactId>dropwizard-logging</artifactId> <version>${dropwizard-version}</version> </dependency> + + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${dropwizard-version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy-version}</version> + </dependency> </dependencies> </dependencyManagement>
