http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java b/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java deleted file mode 100644 index 0150a11..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java +++ /dev/null @@ -1,1320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import java.net.InetAddress; -import java.io.File; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import com.google.protobuf.ByteString; - -import org.junit.Test; -import static org.junit.Assert.*; - -import org.apache.bookkeeper.test.ZooKeeperUtil; -import org.apache.bookkeeper.test.PortManager; - -import org.apache.hedwig.util.HedwigSocketAddress; - -import org.apache.commons.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test Backward Compatability between different versions - */ -public class TestBackwardCompat { - - private static final Logger logger = LoggerFactory.getLogger(TestBackwardCompat.class); - - static final int CONSUMEINTERVAL = 5; - static ZooKeeperUtil zkUtil = new ZooKeeperUtil(); - - static class BookKeeperCluster400 { - - int numBookies; - List<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration> bkConfs; - List<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer> bks; - - BookKeeperCluster400(int numBookies) { - this.numBookies = numBookies; - } - - public void start() throws Exception { - zkUtil.startServer(); - - bks = new LinkedList<org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer>(); - bkConfs = new LinkedList<org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration>(); - - for (int i=0; i<numBookies; i++) { - startBookieServer(); - } - } - - public void stop() throws Exception { - for (org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer bs : bks) { - bs.shutdown(); - } - bks.clear(); - - zkUtil.killServer(); - } - - protected void startBookieServer() throws Exception { - int port = PortManager.nextFreePort(); - File tmpDir = org.apache.hw_v4_0_0.hedwig.util.FileUtils.createTempDirectory( - getClass().getName() + port, "test"); - org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration( - port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir }); - bks.add(startBookie(conf)); - bkConfs.add(conf); - } - - protected org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration newServerConfiguration( - int port, String zkServers, File journalDir, File[] ledgerDirs) { - org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf = - new org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration(); - conf.setBookiePort(port); - conf.setZkServers(zkServers); - conf.setJournalDirName(journalDir.getPath()); - String[] ledgerDirNames = new String[ledgerDirs.length]; - for (int i=0; i<ledgerDirs.length; i++) { - ledgerDirNames[i] = ledgerDirs[i].getPath(); - } - conf.setLedgerDirNames(ledgerDirNames); - return conf; - } - - protected org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer startBookie( - org.apache.hw_v4_0_0.bookkeeper.conf.ServerConfiguration conf) throws Exception { - org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer server - = new org.apache.hw_v4_0_0.bookkeeper.proto.BookieServer(conf); - server.start(); - - int port = conf.getBookiePort(); - while (zkUtil.getZooKeeperClient().exists( - "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, - false) == null) { - Thread.sleep(500); - } - return server; - } - } - - /** - * Version 4.0.0 classes - */ - static class Server400 { - org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration conf; - org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer server; - - Server400(final String zkHosts, final int port, final int sslPort) { - conf = new org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration() { - @Override - public String getZkHost() { - return zkHosts; - } - - @Override - public int getServerPort() { - return port; - } - - @Override - public int getSSLServerPort() { - return sslPort; - } - }; - } - - void start() throws Exception { - server = new org.apache.hw_v4_0_0.hedwig.server.netty.PubSubServer(conf); - } - - void stop() throws Exception { - if (null != server) { - server.shutdown(); - } - } - } - - static class Client400 { - org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration conf; - org.apache.hw_v4_0_0.hedwig.client.api.Client client; - org.apache.hw_v4_0_0.hedwig.client.api.Publisher publisher; - org.apache.hw_v4_0_0.hedwig.client.api.Subscriber subscriber; - - Client400(final String connectString) { - conf = new org.apache.hw_v4_0_0.hedwig.client.conf.ClientConfiguration() { - @Override - protected org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress - getDefaultServerHedwigSocketAddress() { - return new org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress(connectString); - } - }; - client = new org.apache.hw_v4_0_0.hedwig.client.HedwigClient(conf); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - void close() throws Exception { - if (null != client) { - client.close(); - } - } - - org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.MessageSeqId publish( - ByteString topic, ByteString data) throws Exception { - org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message message = - org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol.Message.newBuilder() - .setBody(data).build(); - publisher.publish(topic, message); - return null; - } - } - - static class BookKeeperCluster410 { - - int numBookies; - List<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration> bkConfs; - List<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer> bks; - - BookKeeperCluster410(int numBookies) { - this.numBookies = numBookies; - } - - public void start() throws Exception { - zkUtil.startServer(); - - bks = new LinkedList<org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer>(); - bkConfs = new LinkedList<org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration>(); - - for (int i=0; i<numBookies; i++) { - startBookieServer(); - } - } - - public void stop() throws Exception { - for (org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer bs : bks) { - bs.shutdown(); - } - bks.clear(); - - zkUtil.killServer(); - } - - protected void startBookieServer() throws Exception { - int port = PortManager.nextFreePort(); - File tmpDir = org.apache.hw_v4_1_0.hedwig.util.FileUtils.createTempDirectory( - getClass().getName() + port, "test"); - org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration( - port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir }); - bks.add(startBookie(conf)); - bkConfs.add(conf); - } - - protected org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration newServerConfiguration( - int port, String zkServers, File journalDir, File[] ledgerDirs) { - org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf = - new org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration(); - conf.setBookiePort(port); - conf.setZkServers(zkServers); - conf.setJournalDirName(journalDir.getPath()); - String[] ledgerDirNames = new String[ledgerDirs.length]; - for (int i=0; i<ledgerDirs.length; i++) { - ledgerDirNames[i] = ledgerDirs[i].getPath(); - } - conf.setLedgerDirNames(ledgerDirNames); - return conf; - } - - protected org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer startBookie( - org.apache.hw_v4_1_0.bookkeeper.conf.ServerConfiguration conf) throws Exception { - org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer server - = new org.apache.hw_v4_1_0.bookkeeper.proto.BookieServer(conf); - server.start(); - - int port = conf.getBookiePort(); - while (zkUtil.getZooKeeperClient().exists( - "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, - false) == null) { - Thread.sleep(500); - } - return server; - } - } - - /** - * Version 4.1.0 classes - */ - static class Server410 { - org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration conf; - org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer server; - - Server410(final String zkHosts, final int port, final int sslPort) { - conf = new org.apache.hw_v4_1_0.hedwig.server.common.ServerConfiguration() { - @Override - public int getConsumeInterval() { - return CONSUMEINTERVAL; - } - @Override - public String getZkHost() { - return zkHosts; - } - - @Override - public int getServerPort() { - return port; - } - - @Override - public int getSSLServerPort() { - return sslPort; - } - }; - } - - void start() throws Exception { - server = new org.apache.hw_v4_1_0.hedwig.server.netty.PubSubServer(conf); - server.start(); - } - - void stop() throws Exception { - if (null != server) { - server.shutdown(); - } - } - } - - static class Client410 { - org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration conf; - org.apache.hw_v4_1_0.hedwig.client.api.Client client; - org.apache.hw_v4_1_0.hedwig.client.api.Publisher publisher; - org.apache.hw_v4_1_0.hedwig.client.api.Subscriber subscriber; - - class IntMessageHandler implements org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler { - ByteString topic; - ByteString subId; - int next; - - CountDownLatch latch; - - IntMessageHandler(ByteString t, ByteString s, int start, int num) { - this.topic = t; - this.subId = s; - this.next = start; - this.latch = new CountDownLatch(num); - } - - @Override - public void deliver(ByteString t, ByteString s, - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) { - if (!t.equals(topic) || !s.equals(subId)) { - return; - } - int num = Integer.parseInt(msg.getBody().toStringUtf8()); - if (num == next) { - latch.countDown(); - ++next; - } - callback.operationFinished(context, null); - } - - public boolean await(long timeout, TimeUnit unit) - throws InterruptedException { - return latch.await(timeout, unit); - } - } - - Client410(final String connectString) { - conf = new org.apache.hw_v4_1_0.hedwig.client.conf.ClientConfiguration() { - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return true; - } - @Override - public int getConsumedMessagesBufferSize() { - return 1; - } - @Override - protected org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress - getDefaultServerHedwigSocketAddress() { - return new org.apache.hw_v4_1_0.hedwig.util.HedwigSocketAddress(connectString); - } - }; - client = new org.apache.hw_v4_1_0.hedwig.client.HedwigClient(conf); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - void close() throws Exception { - if (null != client) { - client.close(); - } - } - - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.MessageSeqId publish( - ByteString topic, ByteString data) throws Exception { - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message message = - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder() - .setBody(data).build(); - publisher.publish(topic, message); - return null; - } - - void publishInts(ByteString topic, int start, int num) throws Exception { - for (int i=0; i<num; i++) { - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg = - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8("" + (start+i))).build(); - publisher.publish(topic, msg); - } - } - - void sendXExpectLastY(ByteString topic, ByteString subid, final int x, final int y) - throws Exception { - for (int i=0; i<x; i++) { - publisher.publish(topic, org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - subscriber.subscribe(topic, subid, org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH); - - final AtomicInteger expected = new AtomicInteger(x - y); - final CountDownLatch latch = new CountDownLatch(1); - subscriber.startDelivery(topic, subid, new org.apache.hw_v4_1_0.hedwig.client.api.MessageHandler() { - @Override - synchronized public void deliver(ByteString topic, ByteString subscriberId, - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hw_v4_1_0.hedwig.util.Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - if (value == expected.get()) { - expected.incrementAndGet(); - } else { - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (expected.get() == x) { - latch.countDown(); - } - callback.operationFinished(context, null); - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown(); - } - } - }); - assertTrue("Timed out waiting for messages Y is " + y + " expected is currently " - + expected.get(), latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + x, x, expected.get()); - subscriber.stopDelivery(topic, subid); - subscriber.closeSubscription(topic, subid); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - } - - void subscribe(ByteString topic, ByteString subscriberId) throws Exception { - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options = - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build(); - subscribe(topic, subscriberId, options); - } - - void subscribe(ByteString topic, ByteString subscriberId, - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options) throws Exception { - subscriber.subscribe(topic, subscriberId, options); - } - - void closeSubscription(ByteString topic, ByteString subscriberId) throws Exception { - subscriber.closeSubscription(topic, subscriberId); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - } - - void receiveInts(ByteString topic, ByteString subscriberId, int start, int num) throws Exception { - IntMessageHandler msgHandler = new IntMessageHandler(topic, subscriberId, start, num); - subscriber.startDelivery(topic, subscriberId, msgHandler); - msgHandler.await(num, TimeUnit.SECONDS); - subscriber.stopDelivery(topic, subscriberId); - } - } - - /** - * 4.2.0 Version - */ - static class BookKeeperCluster420{ - - int numBookies; - List<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration> bkConfs; - List<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer> bks; - - - BookKeeperCluster420(int numBookies) { - this.numBookies = numBookies; - } - - public void start() throws Exception { - zkUtil.startServer(); - - bks = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer>(); - bkConfs = new LinkedList<org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration>(); - - for (int i=0; i<numBookies; i++) { - startBookieServer(); - } - } - - public void stop() throws Exception { - for (org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer bs : bks) { - bs.shutdown(); - } - bks.clear(); - - zkUtil.killServer(); - } - - protected void startBookieServer() throws Exception { - int port = PortManager.nextFreePort(); - File tmpDir = org.apache.hw_v4_2_0.hedwig.util.FileUtils.createTempDirectory( - getClass().getName() + port, "test"); - org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration( - port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir }); - bks.add(startBookie(conf)); - bkConfs.add(conf); - } - - protected org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration newServerConfiguration( - int port, String zkServers, File journalDir, File[] ledgerDirs) { - org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf = - new org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration(); - conf.setBookiePort(port); - conf.setZkServers(zkServers); - conf.setJournalDirName(journalDir.getPath()); - String[] ledgerDirNames = new String[ledgerDirs.length]; - for (int i=0; i<ledgerDirs.length; i++) { - ledgerDirNames[i] = ledgerDirs[i].getPath(); - } - conf.setLedgerDirNames(ledgerDirNames); - return conf; - } - - protected org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer startBookie( - org.apache.hw_v4_2_0.bookkeeper.conf.ServerConfiguration conf) throws Exception { - org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer server - = new org.apache.hw_v4_2_0.bookkeeper.proto.BookieServer(conf); - server.start(); - - int port = conf.getBookiePort(); - while (zkUtil.getZooKeeperClient().exists( - "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, - false) == null) { - Thread.sleep(500); - } - return server; - } - } - - static class Server420 { - org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration conf; - org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer server; - - Server420(final String zkHosts, final int port, final int sslPort) { - conf = new org.apache.hw_v4_2_0.hedwig.server.common.ServerConfiguration() { - @Override - public int getConsumeInterval() { - return CONSUMEINTERVAL; - } - - @Override - public String getZkHost() { - return zkHosts; - } - - @Override - public int getServerPort() { - return port; - } - - @Override - public int getSSLServerPort() { - return sslPort; - } - }; - } - - void start() throws Exception { - server = new org.apache.hw_v4_2_0.hedwig.server.netty.PubSubServer(conf); - server.start(); - } - - void stop() throws Exception { - if (null != server) { - server.shutdown(); - } - } - } - - /** - * Current Version - */ - static class BookKeeperClusterCurrent { - - int numBookies; - List<org.apache.bookkeeper.conf.ServerConfiguration> bkConfs; - List<org.apache.bookkeeper.proto.BookieServer> bks; - - - BookKeeperClusterCurrent(int numBookies) { - this.numBookies = numBookies; - } - - public void start() throws Exception { - zkUtil.startServer(); - - bks = new LinkedList<org.apache.bookkeeper.proto.BookieServer>(); - bkConfs = new LinkedList<org.apache.bookkeeper.conf.ServerConfiguration>(); - - for (int i=0; i<numBookies; i++) { - startBookieServer(); - } - } - - public void stop() throws Exception { - for (org.apache.bookkeeper.proto.BookieServer bs : bks) { - bs.shutdown(); - } - bks.clear(); - - zkUtil.killServer(); - } - - protected void startBookieServer() throws Exception { - int port = PortManager.nextFreePort(); - File tmpDir = org.apache.hedwig.util.FileUtils.createTempDirectory( - getClass().getName() + port, "test"); - org.apache.bookkeeper.conf.ServerConfiguration conf = newServerConfiguration( - port, zkUtil.getZooKeeperConnectString(), tmpDir, new File[] { tmpDir }); - conf.setAllowLoopback(true); - bks.add(startBookie(conf)); - bkConfs.add(conf); - } - - protected org.apache.bookkeeper.conf.ServerConfiguration newServerConfiguration( - int port, String zkServers, File journalDir, File[] ledgerDirs) { - org.apache.bookkeeper.conf.ServerConfiguration conf = - new org.apache.bookkeeper.conf.ServerConfiguration(); - conf.setAllowLoopback(true); - conf.setBookiePort(port); - conf.setZkServers(zkServers); - conf.setJournalDirName(journalDir.getPath()); - String[] ledgerDirNames = new String[ledgerDirs.length]; - for (int i=0; i<ledgerDirs.length; i++) { - ledgerDirNames[i] = ledgerDirs[i].getPath(); - } - conf.setLedgerDirNames(ledgerDirNames); - return conf; - } - - protected org.apache.bookkeeper.proto.BookieServer startBookie( - org.apache.bookkeeper.conf.ServerConfiguration conf) throws Exception { - org.apache.bookkeeper.proto.BookieServer server - = new org.apache.bookkeeper.proto.BookieServer(conf); - server.start(); - - int port = conf.getBookiePort(); - while (zkUtil.getZooKeeperClient().exists( - "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, - false) == null) { - Thread.sleep(500); - } - return server; - } - } - - static class ServerCurrent { - org.apache.hedwig.server.common.ServerConfiguration conf; - org.apache.hedwig.server.netty.PubSubServer server; - - ServerCurrent(final String zkHosts, final int port, final int sslPort) { - conf = new org.apache.hedwig.server.common.ServerConfiguration() { - @Override - public int getConsumeInterval() { - return CONSUMEINTERVAL; - } - - @Override - public String getZkHost() { - return zkHosts; - } - - @Override - public int getServerPort() { - return port; - } - - @Override - public int getSSLServerPort() { - return sslPort; - } - }; - } - - void start() throws Exception { - server = new org.apache.hedwig.server.netty.PubSubServer(conf); - server.start(); - } - - void stop() throws Exception { - if (null != server) { - server.shutdown(); - } - } - } - - static class ClientCurrent { - org.apache.hedwig.client.conf.ClientConfiguration conf; - org.apache.hedwig.client.api.Client client; - org.apache.hedwig.client.api.Publisher publisher; - org.apache.hedwig.client.api.Subscriber subscriber; - - class IntMessageHandler implements org.apache.hedwig.client.api.MessageHandler { - ByteString topic; - ByteString subId; - int next; - - CountDownLatch latch; - - IntMessageHandler(ByteString t, ByteString s, int start, int num) { - this.topic = t; - this.subId = s; - this.next = start; - this.latch = new CountDownLatch(num); - } - - @Override - public void deliver(ByteString t, ByteString s, - org.apache.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hedwig.util.Callback<Void> callback, Object context) { - if (!t.equals(topic) || !s.equals(subId)) { - return; - } - int num = Integer.parseInt(msg.getBody().toStringUtf8()); - if (num == next) { - latch.countDown(); - ++next; - } - callback.operationFinished(context, null); - } - - public boolean await(long timeout, TimeUnit unit) - throws InterruptedException { - return latch.await(timeout, unit); - } - } - - ClientCurrent(final String connectString) { - this(true, connectString); - } - - ClientCurrent(final boolean autoConsumeEnabled, final String connectString) { - conf = new org.apache.hedwig.client.conf.ClientConfiguration() { - @Override - public boolean isAutoSendConsumeMessageEnabled() { - return autoConsumeEnabled; - } - @Override - public int getConsumedMessagesBufferSize() { - return 1; - } - @Override - protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return new HedwigSocketAddress(connectString); - } - }; - client = new org.apache.hedwig.client.HedwigClient(conf); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - void close() throws Exception { - if (null != client) { - client.close(); - } - } - - org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId publish( - ByteString topic, ByteString data) throws Exception { - org.apache.hedwig.protocol.PubSubProtocol.Message message = - org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder() - .setBody(data).build(); - org.apache.hedwig.protocol.PubSubProtocol.PublishResponse resp = - publisher.publish(topic, message); - if (null == resp) { - return null; - } - return resp.getPublishedMsgId(); - } - - void publishInts(ByteString topic, int start, int num) throws Exception { - for (int i=0; i<num; i++) { - org.apache.hedwig.protocol.PubSubProtocol.Message msg = - org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody(ByteString.copyFromUtf8("" + (start+i))).build(); - publisher.publish(topic, msg); - } - } - - void sendXExpectLastY(ByteString topic, ByteString subid, final int x, final int y) - throws Exception { - for (int i=0; i<x; i++) { - publisher.publish(topic, org.apache.hedwig.protocol.PubSubProtocol.Message.newBuilder().setBody( - ByteString.copyFromUtf8(String.valueOf(i))).build()); - } - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts - = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH) - .build(); - subscriber.subscribe(topic, subid, opts); - - final AtomicInteger expected = new AtomicInteger(x - y); - final CountDownLatch latch = new CountDownLatch(1); - subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() { - @Override - synchronized public void deliver(ByteString topic, ByteString subscriberId, - org.apache.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hedwig.util.Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - if (value == expected.get()) { - expected.incrementAndGet(); - } else { - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (expected.get() == x) { - latch.countDown(); - } - callback.operationFinished(context, null); - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown(); - } - } - }); - assertTrue("Timed out waiting for messages Y is " + y + " expected is currently " - + expected.get(), latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + x, x, expected.get()); - subscriber.stopDelivery(topic, subid); - subscriber.closeSubscription(topic, subid); - } - - void receiveNumModM(final ByteString topic, final ByteString subid, - final int start, final int num, final int M) throws Exception { - org.apache.hedwig.filter.ServerMessageFilter filter = - new org.apache.hedwig.filter.ServerMessageFilter() { - - @Override - public org.apache.hedwig.filter.ServerMessageFilter - initialize(Configuration conf) { - // do nothing - return this; - } - - @Override - public void uninitialize() { - // do nothing; - } - - @Override - public org.apache.hedwig.filter.MessageFilterBase - setSubscriptionPreferences(ByteString topic, ByteString subscriberId, - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences preferences) { - // do nothing; - return this; - } - - @Override - public boolean testMessage(org.apache.hedwig.protocol.PubSubProtocol.Message msg) { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - return 0 == value % M; - } - }; - filter.initialize(conf.getConf()); - - - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions opts - = org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH) - .build(); - subscriber.subscribe(topic, subid, opts); - final int base = start + M - start % M; - final AtomicInteger expected = new AtomicInteger(base); - final CountDownLatch latch = new CountDownLatch(1); - subscriber.startDeliveryWithFilter(topic, subid, new org.apache.hedwig.client.api.MessageHandler() { - synchronized public void deliver(ByteString topic, ByteString subscriberId, - org.apache.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hedwig.util.Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - // duplicated messages received, ignore them - if (value > start) { - if (value == expected.get()) { - expected.addAndGet(M); - } else { - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - latch.countDown(); - } - if (expected.get() == (base + num * M)) { - latch.countDown(); - } - } - callback.operationFinished(context, null); - } catch (Exception e) { - logger.error("Received bad message", e); - latch.countDown(); - } - } - }, (org.apache.hedwig.filter.ClientMessageFilter) filter); - assertTrue("Timed out waiting for messages mod " + M + " expected is " + expected.get(), - latch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + (base + num * M), (base + num*M), expected.get()); - subscriber.stopDelivery(topic, subid); - filter.uninitialize(); - subscriber.closeSubscription(topic, subid); - } - - void subscribe(ByteString topic, ByteString subscriberId) throws Exception { - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options = - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH).build(); - subscribe(topic, subscriberId, options); - } - - void subscribe(ByteString topic, ByteString subscriberId, - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options) throws Exception { - subscriber.subscribe(topic, subscriberId, options); - } - - void closeSubscription(ByteString topic, ByteString subscriberId) throws Exception { - subscriber.closeSubscription(topic, subscriberId); - } - - void receiveInts(ByteString topic, ByteString subscriberId, int start, int num) throws Exception { - IntMessageHandler msgHandler = new IntMessageHandler(topic, subscriberId, start, num); - subscriber.startDelivery(topic, subscriberId, msgHandler); - msgHandler.await(num, TimeUnit.SECONDS); - subscriber.stopDelivery(topic, subscriberId); - } - - // throttle doesn't work talking with 41 server - void throttleX41(ByteString topic, ByteString subid, final int X) - throws Exception { - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options = - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH) - .setMessageWindowSize(X) .build(); - subscribe(topic, subid, options); - closeSubscription(topic, subid); - publishInts(topic, 1, 3*X); - subscribe(topic, subid); - - final AtomicInteger expected = new AtomicInteger(1); - final CountDownLatch throttleLatch = new CountDownLatch(1); - final CountDownLatch nonThrottleLatch = new CountDownLatch(1); - subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() { - @Override - public synchronized void deliver(ByteString topic, ByteString subscriberId, - org.apache.hedwig.protocol.PubSubProtocol.Message msg, - org.apache.hedwig.util.Callback<Void> callback, Object context) { - try { - int value = Integer.valueOf(msg.getBody().toStringUtf8()); - logger.debug("Received message {},", value); - - if (value == expected.get()) { - expected.incrementAndGet(); - } else { - // error condition - logger.error("Did not receive expected value, expected {}, got {}", - expected.get(), value); - expected.set(0); - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - if (expected.get() > X+1) { - throttleLatch.countDown(); - } - if (expected.get() == (3 * X + 1)) { - nonThrottleLatch.countDown(); - } - callback.operationFinished(context, null); - } catch (Exception e) { - logger.error("Received bad message", e); - throttleLatch.countDown(); - nonThrottleLatch.countDown(); - } - } - }); - assertTrue("Should Receive more messages than throttle value " + X, - throttleLatch.await(10, TimeUnit.SECONDS)); - - assertTrue("Timed out waiting for messages " + (3*X + 1), - nonThrottleLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected message with " + (3*X + 1), - 3*X + 1, expected.get()); - - subscriber.stopDelivery(topic, subid); - closeSubscription(topic, subid); - } - } - - /** - * Test compatability of message bound between version 4.0.0 and - * current version. - * - * 1) message bound doesn't take effects on 4.0.0 server. - * 2) message bound take effects on both 4.1.0 and current server - */ - @Test(timeout=60000) - public void testMessageBoundCompat() throws Exception { - ByteString topic = ByteString.copyFromUtf8("testMessageBoundCompat"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start bookkeeper 400 - BookKeeperCluster400 bkc400 = new BookKeeperCluster400(3); - bkc400.start(); - - // start 400 server - Server400 s400 = new Server400(zkUtil.getZooKeeperConnectString(), port, sslPort); - s400.start(); - - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur = - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH) - .setMessageBound(5).build(); - - ClientCurrent ccur = new ClientCurrent("localhost:" + port + ":" + sslPort); - ccur.subscribe(topic, subid, options5cur); - ccur.closeSubscription(topic, subid); - ccur.sendXExpectLastY(topic, subid, 50, 50); - - // stop 400 servers - s400.stop(); - bkc400.stop(); - - // start bookkeeper 410 - BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3); - bkc410.start(); - - // start 410 server - Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort); - s410.start(); - - ccur.subscribe(topic, subid, options5cur); - ccur.closeSubscription(topic, subid); - ccur.sendXExpectLastY(topic, subid, 50, 5); - - // stop 410 servers - s410.stop(); - bkc410.stop(); - - // start bookkeeper current - BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3); - bkc420.start(); - - // start 420 server - Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort); - s420.start(); - - ccur.subscribe(topic, subid, options5cur); - ccur.closeSubscription(topic, subid); - ccur.sendXExpectLastY(topic, subid, 50, 5); - - // stop 420 server - s420.stop(); - bkc420.stop(); - - ccur.close(); - } - - /** - * Test compatability of publish interface between version 4.1.0 - * and current verison. - * - * 1) 4.1.0 client could talk with current server. - * 2) current client could talk with 4.1.0 server, - * but no message seq id would be returned - */ - @Test(timeout=60000) - public void testPublishCompat410() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestPublishCompat410"); - ByteString data = ByteString.copyFromUtf8("testdata"); - - // start bookkeeper 410 - BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3); - bkc410.start(); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start 410 server - Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort); - s410.start(); - - ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort); - Client410 c410 = new Client410("localhost:"+port+":"+sslPort); - - // client c410 could publish message to 410 server - assertNull(c410.publish(topic, data)); - // client ccur could publish message to 410 server - // but no message seq id would be returned - assertNull(ccur.publish(topic, data)); - - // stop 410 server - s410.stop(); - - // start 420 server - Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), port, sslPort); - s420.start(); - - // client c410 could publish message to 410 server - // but no message seq id would be returned - assertNull(c410.publish(topic, data)); - // client ccur could publish message to current server - assertNotNull(ccur.publish(topic, data)); - - ccur.close(); - c410.close(); - - // stop 420 server - s420.stop(); - bkc410.stop(); - } - - /** - * Test compatability between version 4.1.0 and the current version. - * - * A current server could read subscription data recorded by 4.1.0 server. - */ - @Test(timeout=60000) - public void testSubscriptionDataCompat410() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestCompat410"); - ByteString sub410 = ByteString.copyFromUtf8("sub410"); - ByteString subcur = ByteString.copyFromUtf8("subcur"); - - // start bookkeeper 410 - BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3); - bkc410.start(); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start 410 server - Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort); - s410.start(); - - Client410 c410 = new Client410("localhost:"+port+":"+sslPort); - c410.subscribe(topic, sub410); - c410.closeSubscription(topic, sub410); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - - ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort); - ccur.subscribe(topic, subcur); - ccur.closeSubscription(topic, subcur); - - // publish messages using old client - c410.publishInts(topic, 0, 10); - // stop 410 server - s410.stop(); - - // start 420 server - Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), - port, sslPort); - s420.start(); - - c410.subscribe(topic, sub410); - c410.receiveInts(topic, sub410, 0, 10); - - ccur.subscribe(topic, subcur); - ccur.receiveInts(topic, subcur, 0, 10); - - // publish messages using current client - ccur.publishInts(topic, 10, 10); - - c410.receiveInts(topic, sub410, 10, 10); - ccur.receiveInts(topic, subcur, 10, 10); - - // stop 420 server - s420.stop(); - - c410.close(); - ccur.close(); - - // stop bookkeeper cluster - bkc410.stop(); - } - - /** - * Test compatability between version 4.1.0 and the current version. - * - * A 4.1.0 client could not update message bound, while current could do it. - */ - @Test(timeout=60000) - public void testUpdateMessageBoundCompat410() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - // start bookkeeper - BookKeeperCluster420 bkc420 = new BookKeeperCluster420(3); - bkc420.start(); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start hub server - Server420 s420 = new Server420(zkUtil.getZooKeeperConnectString(), - port, sslPort); - s420.start(); - - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5cur = - org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH) - .setMessageBound(5).build(); - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options5v410 = - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH) - .setMessageBound(5).build(); - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions options20v410 = - org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder() - .setCreateOrAttach(org.apache.hw_v4_1_0.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH) - .setMessageBound(20).build(); - - Client410 c410 = new Client410("localhost:"+port+":"+sslPort); - c410.subscribe(topic, subid, options20v410); - c410.closeSubscription(topic, subid); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - - c410.sendXExpectLastY(topic, subid, 50, 20); - - c410.subscribe(topic, subid, options5v410); - c410.closeSubscription(topic, subid); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - - // the message bound isn't updated. - c410.sendXExpectLastY(topic, subid, 50, 20); - - ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort); - ccur.subscribe(topic, subid, options5cur); - ccur.closeSubscription(topic, subid); - Thread.sleep(1000); // give server time to run disconnect logic (BOOKKEEPER-513) - - // the message bound should be updated. - c410.sendXExpectLastY(topic, subid, 50, 5); - - // stop 420 server - s420.stop(); - - c410.close(); - ccur.close(); - - // stop bookkeeper cluster - bkc420.stop(); - } - - /** - * Test compatability between version 4.1.0 and the current version. - * - * A current client running message filter would fail on 4.1.0 hub servers. - */ - @Test(timeout=60000) - public void testClientMessageFilterCompat410() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestUpdateMessageBoundCompat410"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - // start bookkeeper - BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3); - bkc410.start(); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start hub server 410 - Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort); - s410.start(); - - ClientCurrent ccur = new ClientCurrent("localhost:"+port+":"+sslPort); - ccur.subscribe(topic, subid); - ccur.closeSubscription(topic, subid); - - ccur.publishInts(topic, 0, 100); - try { - ccur.receiveNumModM(topic, subid, 0, 50, 2); - fail("client-side filter could not run on 4.1.0 hub server"); - } catch (Exception e) { - logger.info("Should fail to run client-side message filter on 4.1.0 hub server.", e); - ccur.closeSubscription(topic, subid); - } - - // stop 410 server - s410.stop(); - // stop bookkeeper cluster - bkc410.stop(); - } - - /** - * Test compatability between version 4.1.0 and the current version. - * - * Server side throttling does't work when current client connects to old version - * server. - */ - @Test(timeout=60000) - public void testServerSideThrottleCompat410() throws Exception { - ByteString topic = ByteString.copyFromUtf8("TestServerSideThrottleCompat410"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - - // start bookkeeper - BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3); - bkc410.start(); - - int port = PortManager.nextFreePort(); - int sslPort = PortManager.nextFreePort(); - - // start hub server 410 - Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString(), port, sslPort); - s410.start(); - - ClientCurrent ccur = new ClientCurrent(false, "localhost:"+port+":"+sslPort); - ccur.throttleX41(topic, subid, 10); - - ccur.close(); - - // stop 410 server - s410.stop(); - // stop bookkeeper cluster - bkc410.stop(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java b/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java deleted file mode 100644 index 632ea43..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/TestPubSubServerStartup.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; - -import junit.framework.Assert; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.server.LoggingExceptionHandler; -import org.apache.bookkeeper.test.PortManager; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.netty.PubSubServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.test.ClientBase; -import org.junit.Test; -import org.apache.hedwig.util.FileUtils; - -public class TestPubSubServerStartup { - - private static final Logger logger = LoggerFactory.getLogger(TestPubSubServerStartup.class); - - /** - * Start-up zookeeper + pubsubserver reading from a config URL. Then stop - * and cleanup. - * - * Loop over that. - * - * If the pubsub server does not wait for its zookeeper client to be - * connected, the pubsub server will fail at startup. - * - */ - @Test(timeout=60000) - public void testPubSubServerInstantiationWithConfig() throws Exception { - for (int i = 0; i < 10; i++) { - logger.info("iteration " + i); - instantiateAndDestroyPubSubServer(); - } - } - - private void instantiateAndDestroyPubSubServer() throws IOException, InterruptedException, ConfigurationException, - MalformedURLException, Exception { - int zkPort = PortManager.nextFreePort(); - int hwPort = PortManager.nextFreePort(); - int hwSSLPort = PortManager.nextFreePort(); - String hedwigParams = "default_server_host=localhost:" + hwPort + "\n" - + "zk_host=localhost:" + zkPort + "\n" - + "server_port=" + hwPort + "\n" - + "ssl_server_port=" + hwSSLPort + "\n" - + "zk_timeout=2000\n"; - - File hedwigConfigFile = new File(System.getProperty("java.io.tmpdir") + "/hedwig.cfg"); - writeStringToFile(hedwigParams, hedwigConfigFile); - - ClientBase.setupTestEnv(); - File zkTmpDir = FileUtils.createTempDirectory("zookeeper", "test"); - - ZooKeeperServer zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, zkPort); - - NIOServerCnxnFactory serverFactory = new NIOServerCnxnFactory(); - serverFactory.configure(new InetSocketAddress(zkPort), 100); - serverFactory.startup(zks); - - boolean b = ClientBase.waitForServerUp("127.0.0.1:" + zkPort, 5000); - ServerConfiguration serverConf = new ServerConfiguration(); - serverConf.loadConf(hedwigConfigFile.toURI().toURL()); - - logger.info("Zookeeper server up and running!"); - - ZooKeeper zkc = new ZooKeeper("127.0.0.1:" + zkPort, 5000, null); - - // initialize the zk client with (fake) values - zkc.create("/ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zkc.create("/ledgers/available", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zkc.close(); - PubSubServer hedwigServer = null; - try { - logger.info("starting hedwig broker!"); - hedwigServer = new PubSubServer(serverConf, new ClientConfiguration(), new LoggingExceptionHandler()); - hedwigServer.start(); - } catch (Exception e) { - e.printStackTrace(); - } - Assert.assertNotNull("failed to instantiate hedwig pub sub server", hedwigServer); - - hedwigServer.shutdown(); - serverFactory.shutdown(); - - zks.shutdown(); - - zkTmpDir.delete(); - - ClientBase.waitForServerDown("localhost:" + zkPort, 10000); - - } - - public static void writeStringToFile(String string, File f) throws IOException { - if (f.exists()) { - if (!f.delete()) { - throw new RuntimeException("cannot create file " + f.getAbsolutePath()); - } - } - if (!f.createNewFile()) { - throw new RuntimeException("cannot create new file " + f.getAbsolutePath()); - } - - FileWriter fw = new FileWriter(f); - fw.write(string); - fw.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java deleted file mode 100644 index 978649a..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.delivery; - -import java.util.LinkedList; -import java.util.Queue; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.filter.ServerMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.util.Callback; - -public class StubDeliveryManager implements DeliveryManager { - - public static class StartServingRequest { - public ByteString topic; - public ByteString subscriberId; - public MessageSeqId seqIdToStartFrom; - public DeliveryEndPoint endPoint; - public ServerMessageFilter filter; - - public StartServingRequest(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences, - MessageSeqId seqIdToStartFrom, - DeliveryEndPoint endPoint, - ServerMessageFilter filter) { - this.topic = topic; - this.subscriberId = subscriberId; - this.seqIdToStartFrom = seqIdToStartFrom; - this.endPoint = endPoint; - this.filter = filter; - } - - } - - public Queue<Object> lastRequest = new LinkedList<Object>(); - - @Override - public void startServingSubscription(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences, - MessageSeqId seqIdToStartFrom, - DeliveryEndPoint endPoint, - ServerMessageFilter filter, - Callback<Void> cb, Object ctx) { - lastRequest.add(new StartServingRequest(topic, subscriberId, preferences, - seqIdToStartFrom, endPoint, filter)); - cb.operationFinished(ctx, null); - } - - @Override - public void stopServingSubscriber(ByteString topic, ByteString subscriberId, - SubscriptionEvent event, - Callback<Void> cb, Object ctx) { - lastRequest.add(new TopicSubscriber(topic, subscriberId)); - cb.operationFinished(ctx, null); - } - - @Override - public void messageConsumed(ByteString topic, ByteString subscriberId, - MessageSeqId seqId) { - // do nothing - } - - @Override - public void start() { - } - - @Override - public void stop() { - // do nothing now - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java deleted file mode 100644 index ebc26f1..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.server.delivery; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.filter.PipelineFilter; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.server.common.ServerConfiguration; -import org.apache.hedwig.server.topics.StubTopicManager; -import org.apache.hedwig.server.persistence.PersistRequest; -import org.apache.hedwig.server.persistence.PersistenceManager; -import org.apache.hedwig.server.persistence.StubPersistenceManager; -import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter; -import org.apache.hedwig.util.Callback; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -public class TestFIFODeliveryManager { - private static final Logger logger = LoggerFactory.getLogger(TestFIFODeliveryManager.class); - - static class TestCallback implements Callback<MessageSeqId> { - AtomicBoolean success = new AtomicBoolean(false); - final CountDownLatch latch; - MessageSeqId msgid = null; - - TestCallback(CountDownLatch l) { - this.latch = l; - } - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Persist operation failed", exception); - latch.countDown(); - } - - public void operationFinished(Object ctx, MessageSeqId resultOfOperation) { - msgid = resultOfOperation; - success.set(true); - latch.countDown(); - } - - MessageSeqId getId() { - assertTrue("Persist operation failed", success.get()); - return msgid; - } - } - - /** - * Delivery endpoint which puts all responses on a queue - */ - static class ExecutorDeliveryEndPointWithQueue implements DeliveryEndPoint { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - AtomicInteger numResponses = new AtomicInteger(0); - ConcurrentLinkedQueue<PubSubResponse> queue = new ConcurrentLinkedQueue<PubSubResponse>(); - - public void send(final PubSubResponse response, final DeliveryCallback callback) { - logger.info("Received response {}", response); - queue.add(response); - numResponses.incrementAndGet(); - executor.submit(new Runnable() { - public void run() { - callback.sendingFinished(); - } - }); - } - - public void close() { - executor.shutdown(); - } - - PubSubResponse getNextResponse() { - return queue.poll(); - } - - int getNumResponses() { - return numResponses.get(); - } - } - - /** - * Test that the FIFO delivery manager executes stopServing and startServing - * in the correct order - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-539} - */ - @Test(timeout = 60000) - public void testFIFODeliverySubCloseSubRace() throws Exception { - ServerConfiguration conf = new ServerConfiguration(); - ByteString topic = ByteString.copyFromUtf8("subRaceTopic"); - ByteString subscriber = ByteString.copyFromUtf8("subRaceSubscriber"); - - PersistenceManager pm = new StubPersistenceManager(); - FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf); - ExecutorDeliveryEndPointWithQueue dep = new ExecutorDeliveryEndPointWithQueue(); - SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build(); - - PipelineFilter filter = new PipelineFilter(); - filter.addLast(new AllToAllTopologyFilter()); - filter.initialize(conf.getConf()); - filter.setSubscriptionPreferences(topic, subscriber, prefs); - MessageSeqId startId = MessageSeqId.newBuilder().setLocalComponent(1).build(); - - CountDownLatch l = new CountDownLatch(1); - Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(1))).build(); - TestCallback cb = new TestCallback(l); - pm.persistMessage(new PersistRequest(topic, m, cb, null)); - assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS)); - - final CountDownLatch oplatch = new CountDownLatch(3); - fdm.start(); - fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, filter, - new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - oplatch.countDown(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - oplatch.countDown(); - } - }, null); - fdm.stopServingSubscriber(topic, subscriber, null, - new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - oplatch.countDown(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - oplatch.countDown(); - } - }, null); - fdm.startServingSubscription(topic, subscriber, prefs, startId, dep, filter, - new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - oplatch.countDown(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - oplatch.countDown(); - } - }, null); - - assertTrue("Ops never finished", oplatch.await(10, TimeUnit.SECONDS)); - int seconds = 5; - while (dep.getNumResponses() < 2) { - if (seconds-- == 0) { - break; - } - Thread.sleep(1000); - } - PubSubResponse r = dep.getNextResponse(); - assertNotNull("There should be a response", r); - assertTrue("Response should contain a message", r.hasMessage()); - r = dep.getNextResponse(); - assertNotNull("There should be a response", r); - assertTrue("Response should contain a message", r.hasMessage()); - r = dep.getNextResponse(); - assertNull("There should only be 2 responses", r); - } - - static class ExecutorDeliveryEndPoint implements DeliveryEndPoint { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - AtomicInteger numDelivered = new AtomicInteger(); - final DeliveryManager dm; - - ExecutorDeliveryEndPoint(DeliveryManager dm) { - this.dm = dm; - } - - public void send(final PubSubResponse response, final DeliveryCallback callback) { - executor.submit(new Runnable() { - public void run() { - if (response.hasMessage()) { - MessageSeqId msgid = response.getMessage().getMsgId(); - if ((msgid.getLocalComponent() % 2) == 1) { - dm.messageConsumed(response.getTopic(), - response.getSubscriberId(), - response.getMessage().getMsgId()); - } else { - executor.schedule(new Runnable() { - public void run() { - dm.messageConsumed(response.getTopic(), - response.getSubscriberId(), - response.getMessage().getMsgId()); - } - }, 1, TimeUnit.SECONDS); - } - } - numDelivered.incrementAndGet(); - callback.sendingFinished(); - } - }); - } - - public void close() { - executor.shutdown(); - } - - int getNumDelivered() { - return numDelivered.get(); - } - } - - /** - * Test throttle race issue cause by messageConsumed and doDeliverNextMessage - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503} - */ - @Test(timeout = 60000) - public void testFIFODeliveryThrottlingRace() throws Exception { - final int numMessages = 20; - final int throttleSize = 10; - ServerConfiguration conf = new ServerConfiguration() { - @Override - public int getDefaultMessageWindowSize() { - return throttleSize; - } - }; - ByteString topic = ByteString.copyFromUtf8("throttlingRaceTopic"); - ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber"); - - PersistenceManager pm = new StubPersistenceManager(); - FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf); - ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm); - SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build(); - - PipelineFilter filter = new PipelineFilter(); - filter.addLast(new AllToAllTopologyFilter()); - filter.initialize(conf.getConf()); - filter.setSubscriptionPreferences(topic, subscriber, prefs); - - CountDownLatch l = new CountDownLatch(numMessages); - - TestCallback firstCallback = null; - for (int i = 0; i < numMessages; i++) { - Message m = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(i))).build(); - TestCallback cb = new TestCallback(l); - if (firstCallback == null) { - firstCallback = cb; - } - pm.persistMessage(new PersistRequest(topic, m, cb, null)); - } - fdm.start(); - assertTrue("Persistence never finished", l.await(10, TimeUnit.SECONDS)); - fdm.startServingSubscription(topic, subscriber, prefs, firstCallback.getId(), dep, filter, - new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void result) { - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - // would not happened - } - }, null); - - int count = 30; // wait for 30 seconds maximum - while (dep.getNumDelivered() < numMessages) { - Thread.sleep(1000); - if (count-- == 0) { - break; - } - } - assertEquals("Should have delivered " + numMessages, numMessages, dep.getNumDelivered()); - } - -}