Repository: bookkeeper Updated Branches: refs/heads/master e32c38890 -> b1c12c0f4
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java new file mode 100644 index 0000000..fe87ac9 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -0,0 +1,239 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ExtensionRegistry; + +import org.apache.bookkeeper.auth.ClientAuthProvider; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.PerChannelBookieClient; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.bookkeeper.auth.TestAuth; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; + +import org.apache.bookkeeper.proto.BookieProtocol.*; +import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; + +import java.util.concurrent.ArrayBlockingQueue; + +import static org.junit.Assert.*; + +public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { + static final Logger LOG = LoggerFactory.getLogger(TestBackwardCompatCMS42.class); + + ExtensionRegistry extRegistry = ExtensionRegistry.newInstance(); + ClientAuthProvider.Factory authProvider; + ClientSocketChannelFactory channelFactory + = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool()); + OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient") + .build(); + + public TestBackwardCompatCMS42() throws Exception { + super(0); + + TestDataFormats.registerAllExtensions(extRegistry); + authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory( + new ClientConfiguration(), extRegistry); + } + + @Test(timeout=60000) + public void testAuthSingleMessage() throws Exception { + ServerConfiguration bookieConf = newServerConfiguration(); + bookieConf.setBookieAuthProviderFactoryClass( + TestAuth.AlwaysSucceedBookieAuthProviderFactory.class.getName()); + BookieServer bookie1 = startAndStoreBookie(bookieConf); + + AuthMessage.Builder builder = AuthMessage.newBuilder() + .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME); + builder.setExtension(TestDataFormats.messageType, + TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE); + final AuthMessage authMessage = builder.build(); + + CompatClient42 client = newCompatClient(bookie1.getLocalAddress()); + + Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage); + client.sendRequest(request); + + Response response = client.takeResponse(); + assertTrue("Should be auth response", response instanceof AuthResponse); + assertEquals("Should have succeeded", response.getErrorCode(), BookieProtocol.EOK); + } + + @Test(timeout=60000) + public void testAuthMultiMessage() throws Exception { + ServerConfiguration bookieConf = newServerConfiguration(); + bookieConf.setBookieAuthProviderFactoryClass( + TestAuth.SucceedAfter3BookieAuthProviderFactory.class.getName()); + BookieServer bookie1 = startAndStoreBookie(bookieConf); + + AuthMessage.Builder builder = AuthMessage.newBuilder() + .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME); + builder.setExtension(TestDataFormats.messageType, + TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE); + final AuthMessage authMessage = builder.build(); + CompatClient42 client = newCompatClient(bookie1.getLocalAddress()); + + Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage); + for (int i = 0; i < 3 ; i++) { + client.sendRequest(request); + Response response = client.takeResponse(); + assertTrue("Should be auth response", response instanceof AuthResponse); + AuthResponse authResponse = (AuthResponse)response; + assertEquals("Should have succeeded", + response.getErrorCode(), BookieProtocol.EOK); + TestDataFormats.AuthMessageType type = authResponse.getAuthMessage() + .getExtension(TestDataFormats.messageType); + if (i == 2) { + assertEquals("Should succeed after 3", + type, TestDataFormats.AuthMessageType.SUCCESS_RESPONSE); + } else { + assertEquals("Should be payload", type, + TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE); + } + } + } + + @Test(timeout=60000) + public void testAuthFail() throws Exception { + ServerConfiguration bookieConf = newServerConfiguration(); + bookieConf.setBookieAuthProviderFactoryClass( + TestAuth.FailAfter3BookieAuthProviderFactory.class.getName()); + BookieServer bookie1 = startAndStoreBookie(bookieConf); + + AuthMessage.Builder builder = AuthMessage.newBuilder() + .setAuthPluginName(TestAuth.TEST_AUTH_PROVIDER_PLUGIN_NAME); + builder.setExtension(TestDataFormats.messageType, + TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE); + final AuthMessage authMessage = builder.build(); + CompatClient42 client = newCompatClient(bookie1.getLocalAddress()); + + Request request = new AuthRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, authMessage); + for (int i = 0; i < 3 ; i++) { + client.sendRequest(request); + Response response = client.takeResponse(); + assertTrue("Should be auth response", response instanceof AuthResponse); + AuthResponse authResponse = (AuthResponse)response; + assertEquals("Should have succeeded", + response.getErrorCode(), BookieProtocol.EOK); + TestDataFormats.AuthMessageType type = authResponse.getAuthMessage() + .getExtension(TestDataFormats.messageType); + if (i == 2) { + assertEquals("Should fail after 3", + type, TestDataFormats.AuthMessageType.FAILURE_RESPONSE); + } else { + assertEquals("Should be payload", type, + TestDataFormats.AuthMessageType.PAYLOAD_MESSAGE); + } + + } + + client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + 1L, 1L, (short)0)); + Response response = client.takeResponse(); + assertEquals("Should have failed", + response.getErrorCode(), BookieProtocol.EUA); + } + + // copy from TestAuth + BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception { + bsConfs.add(conf); + BookieServer s = startBookie(conf); + bs.add(s); + return s; + } + + CompatClient42 newCompatClient(BookieSocketAddress addr) throws Exception { + return new CompatClient42(executor, channelFactory, addr, authProvider, extRegistry); + } + + // extending PerChannelBookieClient to get the pipeline factory + class CompatClient42 extends PerChannelBookieClient { + final ArrayBlockingQueue<Response> responses = new ArrayBlockingQueue<Response>(10); + final Channel channel; + final CountDownLatch connected = new CountDownLatch(1); + + CompatClient42(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, + BookieSocketAddress addr, + ClientAuthProvider.Factory authProviderFactory, + ExtensionRegistry extRegistry) throws Exception { + super(executor, channelFactory, addr, authProviderFactory, extRegistry); + + ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); + bootstrap.setPipelineFactory(this); + bootstrap.setOption("tcpNoDelay", false); + bootstrap.setOption("keepAlive", true); + ChannelFuture f = bootstrap.connect(addr.getSocketAddress()).await(); + channel = f.getChannel(); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + if (!(e.getMessage() instanceof Response)) { + LOG.error("Unknown message {}, passing upstream", e.getMessage()); + ctx.sendUpstream(e); + return; + } + responses.add((Response)e.getMessage()); + } + + @Override + public void channelConnected(ChannelHandlerContext ctx, + ChannelStateEvent e) + throws Exception { + connected.countDown(); + } + + Response takeResponse() throws Exception { + return responses.take(); + } + + Response pollResponse() throws Exception { + return responses.poll(); + } + + void sendRequest(Request request) throws Exception { + connected.await(); + channel.write(request); + } + } +} + http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java new file mode 100644 index 0000000..c3f675f --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestDataFormats.java @@ -0,0 +1,126 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/test/proto/TestDataFormats.proto + +package org.apache.bookkeeper.proto; + +public final class TestDataFormats { + private TestDataFormats() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registry.add(org.apache.bookkeeper.proto.TestDataFormats.messageType); + } + public enum AuthMessageType + implements com.google.protobuf.ProtocolMessageEnum { + SUCCESS_RESPONSE(0, 1), + FAILURE_RESPONSE(1, 2), + PAYLOAD_MESSAGE(2, 3), + ; + + public static final int SUCCESS_RESPONSE_VALUE = 1; + public static final int FAILURE_RESPONSE_VALUE = 2; + public static final int PAYLOAD_MESSAGE_VALUE = 3; + + + public final int getNumber() { return value; } + + public static AuthMessageType valueOf(int value) { + switch (value) { + case 1: return SUCCESS_RESPONSE; + case 2: return FAILURE_RESPONSE; + case 3: return PAYLOAD_MESSAGE; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType> + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap<AuthMessageType> + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap<AuthMessageType>() { + public AuthMessageType findValueByNumber(int number) { + return AuthMessageType.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.bookkeeper.proto.TestDataFormats.getDescriptor().getEnumTypes().get(0); + } + + private static final AuthMessageType[] VALUES = { + SUCCESS_RESPONSE, FAILURE_RESPONSE, PAYLOAD_MESSAGE, + }; + + public static AuthMessageType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private AuthMessageType(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:AuthMessageType) + } + + public static final int MESSAGETYPE_FIELD_NUMBER = 1000; + public static final + com.google.protobuf.GeneratedMessage.GeneratedExtension< + org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage, + org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType> messageType = com.google.protobuf.GeneratedMessage + .newFileScopedGeneratedExtension( + org.apache.bookkeeper.proto.TestDataFormats.AuthMessageType.class, + null); + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n$src/test/proto/TestDataFormats.proto\032\'" + + "src/main/proto/BookkeeperProtocol.proto*" + + "R\n\017AuthMessageType\022\024\n\020SUCCESS_RESPONSE\020\001" + + "\022\024\n\020FAILURE_RESPONSE\020\002\022\023\n\017PAYLOAD_MESSAG" + + "E\020\003:4\n\013messageType\022\014.AuthMessage\030\350\007 \002(\0162" + + "\020.AuthMessageTypeB\037\n\033org.apache.bookkeep" + + "er.protoH\001" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + messageType.internalInit(descriptor.getExtensions().get(0)); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + org.apache.bookkeeper.proto.BookkeeperProtocol.getDescriptor(), + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java index ac6bd8d..b43f5cd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java @@ -20,9 +20,12 @@ */ package org.apache.bookkeeper.proto; +import org.apache.bookkeeper.auth.ClientAuthProvider; +import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; @@ -38,6 +41,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.ExtensionRegistry; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; @@ -54,10 +59,16 @@ import static org.junit.Assert.*; public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { private final static Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class); - public TestPerChannelBookieClient() { + ExtensionRegistry extRegistry = ExtensionRegistry.newInstance(); + ClientAuthProvider.Factory authProvider; + + public TestPerChannelBookieClient() throws Exception { super(1); + authProvider = AuthProviderFactoryFactory.newClientAuthProviderFactory( + new ClientConfiguration(), extRegistry); } + /** * Test that a race does not exist between connection completion * and client closure. If a race does exist, this test will simply @@ -74,7 +85,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 1000; i++) { - PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); + PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr, + authProvider, extRegistry); client.connectIfNeededAndDoOp(new GenericCallback<PerChannelBookieClient>() { @Override public void operationComplete(int rc, PerChannelBookieClient client) { @@ -118,7 +130,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 100; i++) { - PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); + PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr, + authProvider, extRegistry); for (int j = i; j < 10; j++) { client.connectIfNeededAndDoOp(nullop); } @@ -150,7 +163,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); - final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); + final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, + addr, authProvider, extRegistry); final AtomicBoolean shouldFail = new AtomicBoolean(false); final AtomicBoolean inconsistent = new AtomicBoolean(false); final AtomicBoolean running = new AtomicBoolean(true); @@ -247,7 +261,8 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { final OrderedSafeExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); - final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, addr); + final PerChannelBookieClient client = new PerChannelBookieClient(executor, channelFactory, + addr, authProvider, extRegistry); final CountDownLatch completion = new CountDownLatch(1); final ReadEntryCallback cb = new ReadEntryCallback() { @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 278dc8c..28de0b2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -195,6 +195,10 @@ public abstract class BookKeeperClusterTestCase { f, new File[] { f }); } + protected ClientConfiguration newClientConfiguration() { + return new ClientConfiguration(baseConf); + } + protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) { ServerConfiguration conf = new ServerConfiguration(baseConf); conf.setBookiePort(port); @@ -289,9 +293,11 @@ public abstract class BookKeeperClusterTestCase { public void run() { try { bookie.suspendProcessing(); + LOG.info("bookie {} is asleep", bookie.getLocalAddress()); l.countDown(); Thread.sleep(seconds*1000); bookie.resumeProcessing(); + LOG.info("bookie {} is awake", bookie.getLocalAddress()); } catch (Exception e) { LOG.error("Error suspending bookie", e); } @@ -441,6 +447,10 @@ public abstract class BookKeeperClusterTestCase { BookieServer server = new BookieServer(conf); server.start(); + if (bkc == null) { + bkc = new BookKeeperTestClient(baseClientConf); + } + int port = conf.getBookiePort(); String host = InetAddress.getLocalHost().getHostAddress(); if (conf.getUseHostNameAsBookieID()) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/b1c12c0f/bookkeeper-server/src/test/proto/TestDataFormats.proto ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/proto/TestDataFormats.proto b/bookkeeper-server/src/test/proto/TestDataFormats.proto new file mode 100644 index 0000000..0c616d7 --- /dev/null +++ b/bookkeeper-server/src/test/proto/TestDataFormats.proto @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.bookkeeper.proto"; +option optimize_for = SPEED; + +import "src/main/proto/BookkeeperProtocol.proto"; + +enum AuthMessageType { + SUCCESS_RESPONSE = 1; + FAILURE_RESPONSE = 2; + PAYLOAD_MESSAGE = 3; +} + +/** + * + */ +extend AuthMessage { + required AuthMessageType messageType = 1000; +}
