http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java deleted file mode 100644 index b708ab9..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.util.Enumeration; - -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; - - -public class SafeAsynBKCallback extends SafeAsyncCallback { - - public static abstract class OpenCallback implements AsyncCallback.OpenCallback { - @Override - public void openComplete(int rc, LedgerHandle ledgerHandle, Object ctx) { - try { - safeOpenComplete(rc, ledgerHandle, ctx); - } catch(Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx); - - } - - public static abstract class CloseCallback implements AsyncCallback.CloseCallback { - @Override - public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx) { - try { - safeCloseComplete(rc, ledgerHandle, ctx); - } catch(Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) ; - } - - public static abstract class ReadCallback implements AsyncCallback.ReadCallback { - - @Override - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { - try { - safeReadComplete(rc, lh, seq, ctx); - } catch(Throwable t) { - invokeUncaughtExceptionHandler(t); - } - - } - - public abstract void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx); - } - - public static abstract class CreateCallback implements AsyncCallback.CreateCallback { - - @Override - public void createComplete(int rc, LedgerHandle lh, Object ctx) { - try { - safeCreateComplete(rc, lh, ctx); - } catch(Throwable t) { - invokeUncaughtExceptionHandler(t); - } - - } - - public abstract void safeCreateComplete(int rc, LedgerHandle lh, Object ctx); - - - } - - public static abstract class AddCallback implements AsyncCallback.AddCallback { - - @Override - public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { - try { - safeAddComplete(rc, lh, entryId, ctx); - } catch(Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx); - - } - -} -
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java deleted file mode 100644 index 11ca3ff..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.lang.Thread.UncaughtExceptionHandler; - -import org.apache.hedwig.server.common.TerminateJVMExceptionHandler; - -public class SafeAsyncCallback { - static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler(); - - public static void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) { - SafeAsyncCallback.uncaughtExceptionHandler = uncaughtExceptionHandler; - } - - static void invokeUncaughtExceptionHandler(Throwable t) { - Thread thread = Thread.currentThread(); - uncaughtExceptionHandler.uncaughtException(thread, t); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java b/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java deleted file mode 100644 index 4e519f3..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.util.List; - -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -public class SafeAsyncZKCallback extends SafeAsyncCallback { - public static abstract class StatCallback implements AsyncCallback.StatCallback { - public void processResult(int rc, String path, Object ctx, Stat stat) { - try { - safeProcessResult(rc, path, ctx, stat); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx, Stat stat); - } - - public static abstract class DataCallback implements AsyncCallback.DataCallback { - public void processResult(int rc, String path, Object ctx, byte data[], Stat stat) { - try { - safeProcessResult(rc, path, ctx, data, stat); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx, byte data[], Stat stat); - } - - public static abstract class ACLCallback implements AsyncCallback.ACLCallback { - public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { - try { - safeProcessResult(rc, path, ctx, acl, stat); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat); - } - - public static abstract class ChildrenCallback implements AsyncCallback.ChildrenCallback { - public void processResult(int rc, String path, Object ctx, List<String> children) { - try { - safeProcessResult(rc, path, ctx, children); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx, List<String> children); - } - - public static abstract class StringCallback implements AsyncCallback.StringCallback { - public void processResult(int rc, String path, Object ctx, String name) { - try { - safeProcessResult(rc, path, ctx, name); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx, String name); - } - - public static abstract class VoidCallback implements AsyncCallback.VoidCallback { - public void processResult(int rc, String path, Object ctx) { - try { - safeProcessResult(rc, path, ctx); - } catch (Throwable t) { - invokeUncaughtExceptionHandler(t); - } - } - - public abstract void safeProcessResult(int rc, String path, Object ctx); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java b/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java deleted file mode 100644 index d3dcfd6..0000000 --- a/hedwig-server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.zookeeper; - -import java.io.IOException; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.data.ACL; - -import org.apache.hedwig.util.PathUtils; - -public class ZkUtils { - - private static final Logger logger = LoggerFactory.getLogger(ZkUtils.class); - - static class SyncObject { - int rc; - String path; - boolean called = false; - } - - public static void createFullPathOptimistic(final ZooKeeper zk, final String originalPath, final byte[] data, - final List<ACL> acl, final CreateMode createMode) - throws KeeperException, IOException, InterruptedException { - final SyncObject syncObj = new SyncObject(); - - createFullPathOptimistic( - zk, originalPath, data, acl, createMode, - new SafeAsyncZKCallback.StringCallback() { - @Override - public void safeProcessResult(final int rc, String path, Object ctx, String name) { - synchronized (syncObj) { - syncObj.rc = rc; - syncObj.path = path; - syncObj.called = true; - syncObj.notify(); - } - } - }, syncObj - ); - - synchronized (syncObj) { - while (!syncObj.called) { - syncObj.wait(); - } - } - - if (Code.OK.intValue() != syncObj.rc) { - throw KeeperException.create(KeeperException.Code.get(syncObj.rc), syncObj.path); - } - } - - public static void createFullPathOptimistic(final ZooKeeper zk, final String originalPath, final byte[] data, - final List<ACL> acl, final CreateMode createMode, final AsyncCallback.StringCallback callback, - final Object ctx) { - - zk.create(originalPath, data, acl, createMode, new SafeAsyncZKCallback.StringCallback() { - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - - if (rc != Code.NONODE.intValue()) { - callback.processResult(rc, path, ctx, name); - return; - } - - // Since I got a nonode, it means that my parents don't exist - // create mode is persistent since ephemeral nodes can't be - // parents - ZkUtils.createFullPathOptimistic(zk, PathUtils.parent(originalPath), new byte[0], acl, - CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { - - @Override - public void safeProcessResult(int rc, String path, Object ctx, String name) { - if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) { - // succeeded in creating the parent, now - // create the original path - ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback, - ctx); - } else { - callback.processResult(rc, path, ctx, name); - } - } - }, ctx); - } - }, ctx); - - } - - public static KeeperException logErrorAndCreateZKException(String msg, String path, int rc) { - KeeperException ke = KeeperException.create(Code.get(rc), path); - logger.error(msg + ",zkPath: " + path, ke); - return ke; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/resources/LICENSE.bin.txt ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/resources/LICENSE.bin.txt b/hedwig-server/src/main/resources/LICENSE.bin.txt deleted file mode 100644 index 1299434..0000000 --- a/hedwig-server/src/main/resources/LICENSE.bin.txt +++ /dev/null @@ -1,302 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------------------------------------------------- -For lib/slf4j-*.jar - - Copyright (c) 2004-2011 QOS.ch - All rights reserved. - - Permission is hereby granted, free of charge, to any person obtaining - a copy of this software and associated documentation files (the - "Software"), to deal in the Software without restriction, including - without limitation the rights to use, copy, modify, merge, publish, - distribute, sublicense, and/or sell copies of the Software, and to - permit persons to whom the Software is furnished to do so, subject to - the following conditions: - - The above copyright notice and this permission notice shall be - included in all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE - LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION - OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - ------------------------------------------------------------------------------------- -For lib/protobuf-java-*.jar - -Copyright 2008, Google Inc. -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Code generated by the Protocol Buffer compiler is owned by the owner -of the input file used when generating it. This code is not -standalone and requires a support library to be linked with it. This -support library is itself covered by the above license. - ------------------------------------------------------------------------------------- -For lib/jline-*.jar - -Copyright (c) 2002-2006, Marc Prud'hommeaux <[email protected]> -All rights reserved. - -Redistribution and use in source and binary forms, with or -without modification, are permitted provided that the following -conditions are met: - -Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - -Redistributions in binary form must reproduce the above copyright -notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with -the distribution. - -Neither the name of JLine nor the names of its contributors -may be used to endorse or promote products derived from this -software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, -BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY -AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO -EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, -OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED -AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING -IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED -OF THE POSSIBILITY OF SUCH DAMAGE. - - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/resources/NOTICE.bin.txt ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/resources/NOTICE.bin.txt b/hedwig-server/src/main/resources/NOTICE.bin.txt deleted file mode 100644 index 81c576e..0000000 --- a/hedwig-server/src/main/resources/NOTICE.bin.txt +++ /dev/null @@ -1,40 +0,0 @@ -Apache BookKeeper -Copyright 2011-2015 The Apache Software Foundation - -Licensed under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at: - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - -This project includes: - Apache Log4j under The Apache Software License, Version 2.0 - BookKeeper Server under The Apache Software License, Version 2.0 - Commons BeanUtils Core under The Apache Software License, Version 2.0 - Commons CLI under The Apache Software License, Version 2.0 - Commons Codec under The Apache Software License, Version 2.0 - Commons Collections under The Apache Software License, Version 2.0 - Commons Configuration under The Apache Software License, Version 2.0 - Commons IO under The Apache Software License, Version 2.0 - Commons Lang under The Apache Software License, Version 2.0 - Commons Logging under The Apache Software License, Version 2.0 - commons-beanutils under Apache License, Version 2.0 - Derby Engine under Apache License, Version 2.0 - Digester under The Apache Software License, Version 2.0 - Hedwig Client under The Apache Software License, Version 2.0 - Hedwig Protocol under The Apache Software License, Version 2.0 - Java Native Access under Apache License, Version 2.0 - JLine under BSD - Protocol Buffer Java API under New BSD license - SLF4J API Module under MIT License - SLF4J LOG4J-12 Binding under MIT License - The Netty Project under Apache License, Version 2.0 - ZooKeeper under Apache License, Version 2.0 - Guava under The Apache Software License, Version 2.0 http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/resources/findbugsExclude.xml b/hedwig-server/src/main/resources/findbugsExclude.xml deleted file mode 100644 index 9d6d431..0000000 --- a/hedwig-server/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,25 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -//--> -<FindBugsFilter> - <Match> - <Class name="org.apache.hedwig.server.persistence.LocalDBPersistenceManager" /> - <Method name="createTable" /> - <!-- We make is safe by hashing the input before using //--> - <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" /> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/resources/p12.pass ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/resources/p12.pass b/hedwig-server/src/main/resources/p12.pass deleted file mode 100644 index e7a8bf7..0000000 --- a/hedwig-server/src/main/resources/p12.pass +++ /dev/null @@ -1 +0,0 @@ -eUySvp2phM2Wk http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/resources/server.p12 ---------------------------------------------------------------------- diff --git a/hedwig-server/src/main/resources/server.p12 b/hedwig-server/src/main/resources/server.p12 deleted file mode 100644 index b7043b8..0000000 Binary files a/hedwig-server/src/main/resources/server.p12 and /dev/null differ http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java b/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java deleted file mode 100644 index 972e145..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/HelperMethods.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; - -public class HelperMethods { - static Random rand = new Random(); - - public static List<Message> getRandomPublishedMessages(int numMessages, int size) { - ByteString[] regions = { ByteString.copyFromUtf8("sp1"), ByteString.copyFromUtf8("re1"), - ByteString.copyFromUtf8("sg") - }; - return getRandomPublishedMessages(numMessages, size, regions); - } - - public static List<Message> getRandomPublishedMessages(int numMessages, int size, ByteString[] regions) { - List<Message> msgs = new ArrayList<Message>(); - for (int i = 0; i < numMessages; i++) { - byte[] body = new byte[size]; - rand.nextBytes(body); - msgs.add(Message.newBuilder().setBody(ByteString.copyFrom(body)).setSrcRegion( - regions[rand.nextInt(regions.length)]).build()); - } - return msgs; - } - - public static boolean areEqual(Message m1, Message m2) { - if (m1.hasSrcRegion() != m2.hasSrcRegion()) { - return false; - } - if (m1.hasSrcRegion() && !m1.getSrcRegion().equals(m2.getSrcRegion())) { - return false; - } - return m1.getBody().equals(m2.getBody()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/StubCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/StubCallback.java b/hedwig-server/src/test/java/org/apache/hedwig/StubCallback.java deleted file mode 100644 index 89790aa..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/StubCallback.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig; - -import java.util.concurrent.SynchronousQueue; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.Either; -import org.apache.hedwig.util.Callback; - -public class StubCallback<T> implements Callback<T> { - - public SynchronousQueue<Either<T, PubSubException>> queue = new SynchronousQueue<Either<T, PubSubException>>(); - - public void operationFailed(Object ctx, final PubSubException exception) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(queue, Either.of((T) null, exception)); - - } - }).start(); - - } - - public void operationFinished(Object ctx, final T resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null)); - - } - }).start(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/StubScanCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/StubScanCallback.java b/hedwig-server/src/test/java/org/apache/hedwig/StubScanCallback.java deleted file mode 100644 index 776a7f5..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/StubScanCallback.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.server.persistence.ScanCallback; - -public class StubScanCallback implements ScanCallback { - List<Message> messages = new ArrayList<Message>(); - boolean success = false, failed = false; - - public void messageScanned(Object ctx, Message message) { - messages.add(message); - success = true; - } - - public void scanFailed(Object ctx, Exception exception) { - failed = true; - success = false; - } - - public void scanFinished(Object ctx, ReasonForFinish reason) { - success = true; - failed = false; - } - - public List<Message> getMessages() { - return messages; - } - - public boolean isSuccess() { - return success; - } - - public boolean isFailed() { - return failed; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java b/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java deleted file mode 100644 index c206ce4..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java +++ /dev/null @@ -1,708 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client; - -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.PubSubServerStandAloneTestBase; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.ConcurrencyUtils; -import org.apache.hedwig.util.SubscriptionListener; -import org.apache.hedwig.util.HedwigSocketAddress; - -import static org.junit.Assert.*; - -@RunWith(Parameterized.class) -public class TestPubSubClient extends PubSubServerStandAloneTestBase { - - private static final int RETENTION_SECS_VALUE = 10; - - // Client side variables - protected HedwigClient client; - protected Publisher publisher; - protected Subscriber subscriber; - - protected class RetentionServerConfiguration extends StandAloneServerConfiguration { - @Override - public boolean isStandalone() { - return true; - } - - @Override - public int getRetentionSecs() { - return RETENTION_SECS_VALUE; - } - } - - // SynchronousQueues to verify async calls - private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>(); - private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>(); - private final SynchronousQueue<SubscriptionEvent> eventQueue = - new SynchronousQueue<SubscriptionEvent>(); - - class TestSubscriptionListener implements SubscriptionListener { - SynchronousQueue<SubscriptionEvent> eventQueue; - public TestSubscriptionListener() { - this.eventQueue = TestPubSubClient.this.eventQueue; - } - public TestSubscriptionListener(SynchronousQueue<SubscriptionEvent> queue) { - this.eventQueue = queue; - } - @Override - public void processEvent(final ByteString topic, final ByteString subscriberId, - final SubscriptionEvent event) { - new Thread(new Runnable() { - @Override - public void run() { - logger.debug("Event {} received for subscription(topic:{}, subscriber:{})", - new Object[] { event, topic.toStringUtf8(), subscriberId.toStringUtf8() }); - ConcurrencyUtils.put(TestSubscriptionListener.this.eventQueue, event); - } - }).start(); - } - } - - // Test implementation of Callback for async client actions. - class TestCallback implements Callback<Void> { - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Operation finished!"); - ConcurrencyUtils.put(queue, true); - } - }).start(); - } - - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - new Thread(new Runnable() { - @Override - public void run() { - logger.error("Operation failed!", exception); - ConcurrencyUtils.put(queue, false); - } - }).start(); - } - } - - // Test implementation of subscriber's message handler. - class TestMessageHandler implements MessageHandler { - - private final SynchronousQueue<Boolean> consumeQueue; - - public TestMessageHandler() { - this.consumeQueue = TestPubSubClient.this.consumeQueue; - } - - public TestMessageHandler(SynchronousQueue<Boolean> consumeQueue) { - this.consumeQueue = consumeQueue; - } - - public void deliver(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback, - Object context) { - new Thread(new Runnable() { - @Override - public void run() { - if (logger.isDebugEnabled()) - logger.debug("Consume operation finished successfully!"); - ConcurrencyUtils.put(TestMessageHandler.this.consumeQueue, true); - } - }).start(); - callback.operationFinished(context, null); - } - } - - @Parameters - public static Collection<Object[]> configs() { - return Arrays.asList(new Object[][] { { true }, { false } }); - } - - protected boolean isSubscriptionChannelSharingEnabled; - - public TestPubSubClient(boolean isSubscriptionChannelSharingEnabled) { - this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled; - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - client = new HedwigClient(new ClientConfiguration() { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return getDefaultHedwigAddress(); - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return TestPubSubClient.this.isSubscriptionChannelSharingEnabled; - } - }); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - } - - @Override - @After - public void tearDown() throws Exception { - client.close(); - super.tearDown(); - } - - @Test(timeout=60000) - public void testSyncPublish() throws Exception { - boolean publishSuccess = true; - try { - publisher.publish(ByteString.copyFromUtf8("mySyncTopic"), Message.newBuilder().setBody( - ByteString.copyFromUtf8("Hello Sync World!")).build()); - } catch (Exception e) { - publishSuccess = false; - } - assertTrue(publishSuccess); - } - - @Test(timeout=60000) - public void testSyncPublishWithResponse() throws Exception { - ByteString topic = ByteString.copyFromUtf8("testSyncPublishWithResponse"); - ByteString subid = ByteString.copyFromUtf8("mysubid"); - - final String prefix = "SyncMessage-"; - final int numMessages = 30; - - final Map<String, MessageSeqId> publishedMsgs = - new HashMap<String, MessageSeqId>(); - - final AtomicInteger numReceived = new AtomicInteger(0); - final CountDownLatch receiveLatch = new CountDownLatch(1); - final Map<String, MessageSeqId> receivedMsgs = - new HashMap<String, MessageSeqId>(); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.startDelivery(topic, subid, new MessageHandler() { - synchronized public void deliver(ByteString topic, ByteString subscriberId, - Message msg, Callback<Void> callback, - Object context) { - String str = msg.getBody().toStringUtf8(); - receivedMsgs.put(str, msg.getMsgId()); - if (numMessages == numReceived.incrementAndGet()) { - receiveLatch.countDown(); - } - callback.operationFinished(context, null); - } - }); - - for (int i=0; i<numMessages; i++) { - String str = prefix + i; - ByteString data = ByteString.copyFromUtf8(str); - Message msg = Message.newBuilder().setBody(data).build(); - PublishResponse response = publisher.publish(topic, msg); - assertNotNull(response); - publishedMsgs.put(str, response.getPublishedMsgId()); - } - - assertTrue("Timed out waiting on callback for messages.", - receiveLatch.await(30, TimeUnit.SECONDS)); - assertEquals("Should be expected " + numMessages + " messages.", - numMessages, numReceived.get()); - assertEquals("Should be expected " + numMessages + " messages in map.", - numMessages, receivedMsgs.size()); - - for (int i=0; i<numMessages; i++) { - final String str = prefix + i; - MessageSeqId pubId = publishedMsgs.get(str); - MessageSeqId revId = receivedMsgs.get(str); - assertTrue("Doesn't receive same message seq id for " + str, - pubId.equals(revId)); - } - } - - @Test(timeout=60000) - public void testAsyncPublish() throws Exception { - publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody( - ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null); - assertTrue(queue.take()); - } - - @Test(timeout=60000) - public void testAsyncPublishWithResponse() throws Exception { - ByteString topic = ByteString.copyFromUtf8("testAsyncPublishWithResponse"); - ByteString subid = ByteString.copyFromUtf8("mysubid"); - - final String prefix = "AsyncMessage-"; - final int numMessages = 30; - - final AtomicInteger numPublished = new AtomicInteger(0); - final CountDownLatch publishLatch = new CountDownLatch(1); - final Map<String, MessageSeqId> publishedMsgs = - new HashMap<String, MessageSeqId>(); - - final AtomicInteger numReceived = new AtomicInteger(0); - final CountDownLatch receiveLatch = new CountDownLatch(1); - final Map<String, MessageSeqId> receivedMsgs = - new HashMap<String, MessageSeqId>(); - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - subscriber.startDelivery(topic, subid, new MessageHandler() { - synchronized public void deliver(ByteString topic, ByteString subscriberId, - Message msg, Callback<Void> callback, - Object context) { - String str = msg.getBody().toStringUtf8(); - receivedMsgs.put(str, msg.getMsgId()); - if (numMessages == numReceived.incrementAndGet()) { - receiveLatch.countDown(); - } - callback.operationFinished(context, null); - } - }); - - for (int i=0; i<numMessages; i++) { - final String str = prefix + i; - ByteString data = ByteString.copyFromUtf8(str); - Message msg = Message.newBuilder().setBody(data).build(); - publisher.asyncPublishWithResponse(topic, msg, new Callback<PublishResponse>() { - @Override - public void operationFinished(Object ctx, PublishResponse response) { - publishedMsgs.put(str, response.getPublishedMsgId()); - if (numMessages == numPublished.incrementAndGet()) { - publishLatch.countDown(); - } - } - @Override - public void operationFailed(Object ctx, final PubSubException exception) { - publishLatch.countDown(); - } - }, null); - } - assertTrue("Timed out waiting on callback for publish requests.", - publishLatch.await(10, TimeUnit.SECONDS)); - assertEquals("Should be expected " + numMessages + " publishes.", - numMessages, numPublished.get()); - assertEquals("Should be expected " + numMessages + " publishe responses.", - numMessages, publishedMsgs.size()); - - assertTrue("Timed out waiting on callback for messages.", - receiveLatch.await(30, TimeUnit.SECONDS)); - assertEquals("Should be expected " + numMessages + " messages.", - numMessages, numReceived.get()); - assertEquals("Should be expected " + numMessages + " messages in map.", - numMessages, receivedMsgs.size()); - - for (int i=0; i<numMessages; i++) { - final String str = prefix + i; - MessageSeqId pubId = publishedMsgs.get(str); - MessageSeqId revId = receivedMsgs.get(str); - assertTrue("Doesn't receive same message seq id for " + str, - pubId.equals(revId)); - } - } - - @Test(timeout=60000) - public void testMultipleAsyncPublish() throws Exception { - ByteString topic1 = ByteString.copyFromUtf8("myFirstTopic"); - ByteString topic2 = ByteString.copyFromUtf8("myNewTopic"); - - publisher.asyncPublish(topic1, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello World!")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - publisher.asyncPublish(topic2, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello on new topic!")) - .build(), new TestCallback(), null); - assertTrue(queue.take()); - publisher.asyncPublish(topic1, Message.newBuilder().setBody( - ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null); - assertTrue(queue.take()); - } - - @Test(timeout=60000) - public void testSyncSubscribe() throws Exception { - boolean subscribeSuccess = true; - try { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(ByteString.copyFromUtf8("mySyncSubscribeTopic"), ByteString.copyFromUtf8("1"), opts); - } catch (Exception e) { - subscribeSuccess = false; - } - assertTrue(subscribeSuccess); - } - - @Test(timeout=60000) - public void testAsyncSubscribe() throws Exception { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"), - opts, new TestCallback(), null); - assertTrue(queue.take()); - } - - @Test(timeout=60000) - public void testStartDeliveryAfterCloseSub() throws Exception { - ByteString topic = ByteString.copyFromUtf8("testStartDeliveryAfterCloseSub"); - ByteString subid = ByteString.copyFromUtf8("mysubid"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - - // Start delivery for the subscriber - subscriber.startDelivery(topic, subid, new TestMessageHandler()); - - // Now publish some messages for the topic to be consumed by the - // subscriber. - publisher.publish(topic, Message.newBuilder() - .setBody(ByteString.copyFromUtf8("Message #1")).build()); - assertTrue(consumeQueue.take()); - - // Close subscriber for the subscriber - subscriber.closeSubscription(topic, subid); - - // subscribe again - subscriber.subscribe(topic, subid, opts); - subscriber.startDelivery(topic, subid, new TestMessageHandler()); - - publisher.publish(topic, Message.newBuilder() - .setBody(ByteString.copyFromUtf8("Message #2")).build()); - assertTrue(consumeQueue.take()); - } - - @Test(timeout=60000) - public void testSubscribeAndConsume() throws Exception { - ByteString topic = ByteString.copyFromUtf8("myConsumeTopic"); - ByteString subscriberId = ByteString.copyFromUtf8("1"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - - subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null); - assertTrue(queue.take()); - - // Start delivery for the subscriber - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - - // Now publish some messages for the topic to be consumed by the - // subscriber. - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #3")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #4")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #5")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - - @Test(timeout=60000) - public void testAsyncSubscribeAndUnsubscribe() throws Exception { - ByteString topic = ByteString.copyFromUtf8("myAsyncUnsubTopic"); - ByteString subscriberId = ByteString.copyFromUtf8("1"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null); - assertTrue(queue.take()); - subscriber.asyncUnsubscribe(topic, subscriberId, new TestCallback(), null); - assertTrue(queue.take()); - } - - @Test(timeout=60000) - public void testSyncUnsubscribeWithoutSubscription() throws Exception { - boolean unsubscribeSuccess = false; - try { - subscriber.unsubscribe(ByteString.copyFromUtf8("mySyncUnsubTopic"), ByteString.copyFromUtf8("1")); - } catch (ClientNotSubscribedException e) { - unsubscribeSuccess = true; - } catch (Exception ex) { - unsubscribeSuccess = false; - } - assertTrue(unsubscribeSuccess); - } - - @Test(timeout=60000) - public void testAsyncSubscribeAndCloseSubscription() throws Exception { - ByteString topic = ByteString.copyFromUtf8("myAsyncSubAndCloseSubTopic"); - ByteString subscriberId = ByteString.copyFromUtf8("1"); - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(topic, subscriberId, opts, new TestCallback(), null); - assertTrue(queue.take()); - subscriber.closeSubscription(topic, subscriberId); - assertTrue(true); - } - - @Test(timeout=60000) - public void testSubClosesubAndPublish() throws Exception { - ByteString topic = ByteString.copyFromUtf8("mySubClosesubAndPublish"); - ByteString subid = ByteString.copyFromUtf8("mysub"); - // to populate startServing/stopServing sequeuence - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - for (int i=0; i<5; i++) { - subscriber.subscribe(topic, subid, opts); - subscriber.closeSubscription(topic, subid); - } - subscriber.subscribe(topic, subid, opts); - subscriber.startDelivery(topic, subid, new TestMessageHandler()); - for (int i=0; i<3; i++) { - publisher.asyncPublish(topic, - Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #" + i)).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - } - } - - @Test(timeout=60000) - public void testSyncSubscribeWithListener() throws Exception { - ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListener"); - ByteString subscriberId = ByteString.copyFromUtf8("mysub"); - subscriber.addSubscriptionListener(new TestSubscriptionListener()); - try { - SubscriptionOptions options = - SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setEnableResubscribe(false).build(); - subscriber.subscribe(topic, subscriberId, options); - } catch (PubSubException.ServiceDownException e) { - fail("Should not reach here!"); - } - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - tearDownHubServer(); - assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take()); - } - - @Test(timeout=60000) - public void testAsyncSubscribeWithListener() throws Exception { - ByteString topic = ByteString.copyFromUtf8("myAsyncSubscribeWithListener"); - ByteString subscriberId = ByteString.copyFromUtf8("mysub"); - subscriber.addSubscriptionListener(new TestSubscriptionListener()); - SubscriptionOptions options = - SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setEnableResubscribe(false).build(); - subscriber.asyncSubscribe(topic, subscriberId, options, - new TestCallback(), null); - assertTrue(queue.take()); - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - tearDownHubServer(); - assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take()); - } - - @Test(timeout=60000) - public void testSyncSubscribeForceAttach() throws Exception { - ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeForceAttach"); - ByteString subscriberId = ByteString.copyFromUtf8("mysub"); - subscriber.addSubscriptionListener(new TestSubscriptionListener()); - SubscriptionOptions options = - SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setForceAttach(true).setEnableResubscribe(false).build(); - try { - subscriber.subscribe(topic, subscriberId, options); - } catch (PubSubException.ServiceDownException e) { - fail("Should not reach here!"); - } - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - - // new a client - HedwigClient client2 = new HedwigClient(new ClientConfiguration() { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return getDefaultHedwigAddress(); - } - }); - Subscriber subscriber2 = client2.getSubscriber(); - Publisher publisher2 = client2.getPublisher(); - SynchronousQueue<SubscriptionEvent> eventQueue2 = - new SynchronousQueue<SubscriptionEvent>(); - subscriber2.addSubscriptionListener(new TestSubscriptionListener(eventQueue2)); - try { - subscriber2.subscribe(topic, subscriberId, options); - } catch (PubSubException.ServiceDownException e) { - fail("Should not reach here!"); - } - - SynchronousQueue<Boolean> consumeQueue2 = new SynchronousQueue<Boolean>(); - subscriber2.startDelivery(topic, subscriberId, new TestMessageHandler(consumeQueue2)); - - assertEquals(SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED, eventQueue.take()); - assertTrue(eventQueue2.isEmpty()); - - // Now publish some messages for the topic to be consumed by the - // subscriber. - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue2.take()); - assertTrue(consumeQueue.isEmpty()); - - publisher2.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue2.take()); - assertTrue(consumeQueue.isEmpty()); - - client2.close(); - } - - @Test(timeout=60000) - public void testSyncSubscribeWithListenerWhenReleasingTopic() throws Exception { - client.close(); - - tearDownHubServer(); - startHubServer(new RetentionServerConfiguration()); - client = new HedwigClient(new ClientConfiguration() { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return getDefaultHedwigAddress(); - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return TestPubSubClient.this.isSubscriptionChannelSharingEnabled; - } - }); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - - ByteString topic = ByteString.copyFromUtf8("mySyncSubscribeWithListenerWhenReleasingTopic"); - ByteString subscriberId = ByteString.copyFromUtf8("mysub"); - subscriber.addSubscriptionListener(new TestSubscriptionListener()); - SubscriptionOptions options = - SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setForceAttach(false).setEnableResubscribe(false).build(); - try { - subscriber.subscribe(topic, subscriberId, options); - } catch (PubSubException.ServiceDownException e) { - fail("Should not reach here!"); - } - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - assertTrue(consumeQueue.take()); - - Thread.sleep(RETENTION_SECS_VALUE * 2); - assertEquals(SubscriptionEvent.TOPIC_MOVED, eventQueue.take()); - } - - @Test(timeout = 60000) - public void testCloseSubscribeDuringResubscribe() throws Exception { - client.close(); - - final long reconnectWaitTime = 2000L; - client = new HedwigClient(new ClientConfiguration() { - @Override - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return getDefaultHedwigAddress(); - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return TestPubSubClient.this.isSubscriptionChannelSharingEnabled; - } - - @Override - public long getSubscribeReconnectRetryWaitTime() { - return reconnectWaitTime; - } - }); - - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - - ByteString topic = ByteString.copyFromUtf8("testCloseSubscribeDuringResubscribe"); - ByteString subscriberId = ByteString.copyFromUtf8("mysub"); - subscriber.addSubscriptionListener(new TestSubscriptionListener()); - SubscriptionOptions options = - SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH) - .setForceAttach(false).setEnableResubscribe(true).build(); - subscriber.subscribe(topic, subscriberId, options); - logger.info("Subscribed topic {}, subscriber {}.", topic.toStringUtf8(), - subscriberId.toStringUtf8()); - subscriber.startDelivery(topic, subscriberId, new TestMessageHandler()); - - // tear down the hub server to let subscribe enter - tearDownHubServer(); - logger.info("Tear down the hub server"); - - // wait for client enter to resubscribe logic - Thread.sleep(reconnectWaitTime / 2); - - // close sub - subscriber.closeSubscription(topic, subscriberId); - - // start the hub server again - startHubServer(conf); - - // publish a new message - publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(), - new TestCallback(), null); - assertTrue(queue.take()); - - // wait for another reconnect time period - assertNull("Should not receive any messages since the subscription has already been closed.", - consumeQueue.poll(reconnectWaitTime + reconnectWaitTime / 2, TimeUnit.MILLISECONDS)); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java ---------------------------------------------------------------------- diff --git a/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java b/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java deleted file mode 100644 index 6fa3407..0000000 --- a/hedwig-server/src/test/java/org/apache/hedwig/client/TestSubAfterCloseSub.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.server.HedwigHubTestBase; -import org.apache.hedwig.server.delivery.DeliveryManager; -import org.apache.hedwig.server.delivery.FIFODeliveryManager; -import org.apache.hedwig.server.netty.PubSubServer; -import org.apache.hedwig.util.Callback; -import org.junit.Test; - -import com.google.protobuf.ByteString; - -import static org.junit.Assert.*; - -public class TestSubAfterCloseSub extends HedwigHubTestBase { - - class TestClientConfiguration extends HubClientConfiguration { - - boolean isSubscriptionChannelSharingEnabled; - - TestClientConfiguration(boolean isSubscriptionChannelSharingEnabled) { - this.isSubscriptionChannelSharingEnabled = isSubscriptionChannelSharingEnabled; - } - - @Override - public boolean isSubscriptionChannelSharingEnabled() { - return isSubscriptionChannelSharingEnabled; - } - } - - private void sleepDeliveryManager(final CountDownLatch wakeupLatch) - throws IOException { - PubSubServer server = serversList.get(0); - assertNotNull("There should be at least one pubsub server", server); - DeliveryManager dm = server.getDeliveryManager(); - assertNotNull("Delivery manager should not be null once server has started", dm); - assertTrue("Delivery manager is wrong type", dm instanceof FIFODeliveryManager); - final FIFODeliveryManager fdm = (FIFODeliveryManager)dm; - - Thread sleeper = new Thread() { - @Override - public void run() { - try { - fdm.suspendProcessing(); - wakeupLatch.await(); - fdm.resumeProcessing(); - } catch (Exception e) { - logger.error("Error suspending delivery manager", e); - } - } - }; - sleeper.start(); - } - - /** - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-507} - */ - /* TODO: Add this test case back after BOOKKEEPER-37 is fixed - @Test(timeout=15000) - public void testSubAfterCloseSubForSimpleClient() throws Exception { - runSubAfterCloseSubTest(false); - } - */ - - /** - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-507} - */ - @Test(timeout=15000) - public void testSubAfterCloseSubForMultiplexClient() throws Exception { - runSubAfterCloseSubTest(true); - } - - private void runSubAfterCloseSubTest(boolean sharedSubscriptionChannel) throws Exception { - HedwigClient client = new HedwigClient(new TestClientConfiguration(sharedSubscriptionChannel)); - Publisher publisher = client.getPublisher(); - final Subscriber subscriber = client.getSubscriber(); - - final ByteString topic = ByteString.copyFromUtf8("TestSubAfterCloseSub-" + sharedSubscriptionChannel); - final ByteString subid = ByteString.copyFromUtf8("mysub"); - - final CountDownLatch wakeupLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - final CountDownLatch subLatch = new CountDownLatch(1); - final CountDownLatch deliverLatch = new CountDownLatch(1); - - try { - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(topic, subid, opts); - sleepDeliveryManager(wakeupLatch); - subscriber.asyncCloseSubscription(topic, subid, new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - closeLatch.countDown(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Closesub failed : ", exception); - } - }, null); - - SubscriptionOptions optsAttach = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - subscriber.asyncSubscribe(topic, subid, optsAttach, new Callback<Void>() { - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - try { - subscriber.startDelivery(topic, subid, new MessageHandler() { - @Override - public void deliver(ByteString topic, ByteString subid, Message msg, - Callback<Void> callback, Object context) { - deliverLatch.countDown(); - } - }); - } catch (Exception cnse) { - logger.error("Failed to start delivery : ", cnse); - } - subLatch.countDown(); - } - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error("Failed to subscriber : ", exception); - } - }, null); - // Make the delivery manager thread sleep for a while. - // Before {@link https://issues.apache.org/jira/browse/BOOKKEEPER-507}, - // subscribe would succeed before closesub, while closesub would clear - // a successful subscription w/o notifying the client. - TimeUnit.SECONDS.sleep(2); - // wake up fifo delivery thread - wakeupLatch.countDown(); - // wait close sub to succeed - assertTrue("Async close subscription should succeed.", - closeLatch.await(5, TimeUnit.SECONDS)); - assertTrue("Subscribe should succeed.", - subLatch.await(5, TimeUnit.SECONDS)); - // publish a message - publisher.publish(topic, Message.newBuilder().setBody(topic).build()); - // wait for seconds to receive message - assertTrue("Message should be received through successful subscription.", - deliverLatch.await(5, TimeUnit.SECONDS)); - } finally { - client.close(); - } - } - - /** - * Test that if we close a subscription and open again immediately, we don't - * get a TOPIC_BUSY. This race existed because the simple client simply closed - * the connection when closing a subscription, and another client could try to - * attach to the subscription before the channel disconnected event occurs. - * - * {@link https://issues.apache.org/jira/browse/BOOKKEEPER-513} - */ - @Test(timeout=15000) - public void testSimpleClientDoesntGetTopicBusy() throws Exception { - // run ten times to increase chance of hitting race - for (int i = 0; i < 10; i++) { - HedwigClient client1 = new HedwigClient(new TestClientConfiguration(false)); - Subscriber subscriber1 = client1.getSubscriber(); - HedwigClient client2 = new HedwigClient(new TestClientConfiguration(false)); - Subscriber subscriber2 = client2.getSubscriber(); - - final ByteString topic = ByteString.copyFromUtf8("TestSimpleClientTopicBusy"); - final ByteString subid = ByteString.copyFromUtf8("mysub"); - - SubscriptionOptions opts1 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber1.subscribe(topic, subid, opts1); - subscriber1.closeSubscription(topic, subid); - - SubscriptionOptions opts2 = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.ATTACH).build(); - subscriber2.subscribe(topic, subid, opts2); - subscriber2.closeSubscription(topic, subid); - - client1.close(); - client2.close(); - } - } -} -
