http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java deleted file mode 100644 index 07236a5..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/package-info.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * A Netty based Hedwig client implementation. - * - * <h3>Components</h3> - * - * The netty based implementation contains following components: - * <ul> - * <li>{@link HChannel}: A interface wrapper of netty {@link org.jboss.netty.channel.Channel} - * to submit hedwig's {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s - * to target host.</li> - * <li>{@link HChanneHandler}: A wrapper of netty {@link org.jboss.netty.channel.ChannelHandler} - * to handle events of its underlying netty channel, such as responses received, channel - * disconnected, etc. A {@link HChannelHandler} is bound with a {@link HChannel}.</li> - * <li>{@link HChannelManager}: A manager manages all established {@link HChannel}s. - * It provides a clean interface for publisher/subscriber to send - * {@link org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest}s</li> - * </ul> - * - * <h3>Main Flow</h3> - * - * <ul> - * <li>{@link HedwigPublisher}/{@link HedwigSubscriber} delegates {@link HChannelManager} - * to submit pub/sub requests.</li> - * <li>{@link HChannelManager} find the owner hubs, establish a {@link HChannel} to hub servers - * and send the requests to them.</li> - * <li>{@link HChannelHandler} dispatches responses to target - * {@link org.apache.hedwig.client.handlers.AbstractResponseHandler} to process.</li> - * <li>{@link HChannelHandler} detects an underlying netty {@link org.jboss.netty.channel.Channel} - * disconnected. It calles {@link HChannelManager} to clear cached {@link HChannel} that - * it bound with. For non-subscritpion channels, it would fail all pending requests; - * For subscription channels, it would fail all pending requests and retry to reconnect - * those successful subscriptions.</li> - * </ul> - * - * <h3>HChannel</h3> - * - * Two kinds of {@link HChannel}s provided in current implementation. {@link HChannelImpl} - * provides the ability to multiplex pub/sub requests in an underlying netty - * {@link org.jboss.netty.channel.Channel}, while {@link DefaultServerChannel} provides the - * ability to establish a netty channel {@link org.jboss.netty.channel.Channel} for a pub/sub - * request. After the underlying netty channel is estabilished, it would be converted into - * a {@link HChannelImpl} by {@link HChannelManager#submitOpThruChannel(pubSubData, channel)}. - * - * Although {@link HChannelImpl} provides multiplexing ability, it still could be used for - * one-channel-per-subscription case, which just sent only one subscribe request thru the - * underlying channel. - * - * <h3>HChannelHandler</h3> - * - * {@link HChannelHandler} is generic netty {@link org.jboss.netty.channel.ChannelHandler}, - * which handles events from the underlying channel. A <i>HChannelHandler</i> is bound with - * a {@link HChannel} as channel pipeplien when the underlying channel is established. It - * takes the responsibility of dispatching response to target response handler. For a - * non-subscription channel, it just handles <b>PUBLISH</b> and <b>UNSUBSCRIBE</b> responses. - * For a subscription channel, it handles <b>SUBSCRIBE</b> response. For consume requests, - * we treated them in a fire-and-forget way, so they are not need to be handled by any response - * handler. - * - * <h3>HChannelManager</h3> - * - * {@link HChannelManager} manages all outstanding connections to target hub servers for a client. - * Since a subscription channel acts quite different from a non-subscription channel, the basic - * implementation {@link AbstractHChannelManager} manages non-subscription channels and - * subscription channels in different channel sets. Currently hedwig client provides - * {@link SimpleHChannelManager} which manages subscription channels in one-channel-per-subscription - * way. In future, if we want to multiplex multiple subscriptions in one channel, we just need - * to provide an multiplexing version of {@link AbstractHChannelManager} which manages channels - * in multiplexing way, and a multiplexing version of {@link org.apache.hedwig.client.handlers.SubscribeResponseHandler} - * which handles multiple subscriptions in one channel. - */ -package org.apache.hedwig.client.netty;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslClientContextFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslClientContextFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslClientContextFactory.java deleted file mode 100644 index ee488ba..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslClientContextFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.ssl; - -import javax.net.ssl.SSLContext; - -import org.apache.hedwig.client.conf.ClientConfiguration; - -public class SslClientContextFactory extends SslContextFactory { - - public SslClientContextFactory(ClientConfiguration cfg) { - try { - // Create the SSL context. - ctx = SSLContext.getInstance("TLS"); - ctx.init(null, getTrustManagers(), null); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - @Override - protected boolean isClient() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java b/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java deleted file mode 100644 index 33c5a53..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/ssl/SslContextFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.ssl; - -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - -public abstract class SslContextFactory { - - protected SSLContext ctx; - - public SSLContext getContext() { - return ctx; - } - - protected abstract boolean isClient(); - - public SSLEngine getEngine() { - SSLEngine engine = ctx.createSSLEngine(); - engine.setUseClientMode(isClient()); - return engine; - } - - protected TrustManager[] getTrustManagers() { - return new TrustManager[] { new X509TrustManager() { - // Always trust, even if invalid. - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - - @Override - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // Always trust. - } - - @Override - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { - // Always trust. - } - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java b/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java deleted file mode 100644 index 57232ba..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/conf/AbstractConfiguration.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.conf; - -import java.net.URL; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; - -public abstract class AbstractConfiguration { - protected CompositeConfiguration conf; - - protected AbstractConfiguration() { - conf = new CompositeConfiguration(); - } - - /** - * Return real configuration object - * - * @return configuration - */ - public Configuration getConf() { - return conf; - } - - /** - * You can load configurations in precedence order. The first one takes - * precedence over any loaded later. - * - * @param confURL - */ - public void loadConf(URL confURL) throws ConfigurationException { - Configuration loadedConf = new PropertiesConfiguration(confURL); - conf.addConfiguration(loadedConf); - - } - - /** - * Add configuration object. - * - * @param conf configuration object - */ - public void addConf(Configuration otherConf) throws ConfigurationException { - conf.addConfiguration(otherConf); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java b/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java deleted file mode 100644 index 96ef0d9..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/filter/ClientMessageFilter.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.filter; - -/** - * Message Filter running in client-side. - */ -public interface ClientMessageFilter extends MessageFilterBase { -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java b/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java deleted file mode 100644 index e654038..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/filter/MessageFilterBase.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.filter; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; - -public interface MessageFilterBase { - - /** - * Set subscription preferences. - * - * <code>preferences</code> of the subscriber will be passed to message filter when - * the message filter attaches to its subscription either in server-side or client-side. - * - * @param topic - * Topic Name. - * @param subscriberId - * Subscriber Id. - * @param preferences - * Subscription Preferences. - * @return message filter - */ - public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences); - - /** - * Tests whether a particular message passes the filter or not - * - * @param message - * @return - */ - public boolean testMessage(Message message); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java b/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java deleted file mode 100644 index 6b473ad..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/filter/PipelineFilter.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.filter; - -import java.io.IOException; -import java.util.List; -import java.util.LinkedList; - -import com.google.protobuf.ByteString; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; - -/** - * A filter filters messages in pipeline. - */ -public class PipelineFilter extends LinkedList<ServerMessageFilter> -implements ServerMessageFilter { - - @Override - public ServerMessageFilter initialize(Configuration conf) - throws ConfigurationException, IOException { - for (ServerMessageFilter filter : this) { - filter.initialize(conf); - } - return this; - } - - @Override - public void uninitialize() { - while (!isEmpty()) { - ServerMessageFilter filter = removeLast(); - filter.uninitialize(); - } - } - - @Override - public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId, - SubscriptionPreferences preferences) { - for (ServerMessageFilter filter : this) { - filter.setSubscriptionPreferences(topic, subscriberId, preferences); - } - return this; - } - - @Override - public boolean testMessage(Message message) { - for (ServerMessageFilter filter : this) { - if (!filter.testMessage(message)) { - return false; - } - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java b/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java deleted file mode 100644 index f4c2248..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/filter/ServerMessageFilter.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.filter; - -import java.io.IOException; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; - -/** - * Message Filter running in server-side. Hub server uses reflection to - * instantiate a message filter to filter messages. - */ -public interface ServerMessageFilter extends MessageFilterBase { - - /** - * Initialize the message filter. - * - * @param conf - * Configuration Object. An <i>MessageFilter</i> might read settings from it. - * @return message filter - * @throws IOException when failed to initialize message filter - */ - public ServerMessageFilter initialize(Configuration conf) - throws ConfigurationException, IOException; - - /** - * Uninitialize the message filter. - */ - public void uninitialize(); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java b/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java deleted file mode 100644 index 78deef1..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/Callback.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import org.apache.hedwig.exceptions.PubSubException; - -/** - * This class is used for callbacks for asynchronous operations - * - */ -public interface Callback<T> { - - /** - * This method is called when the asynchronous operation finishes - * - * @param ctx - * @param resultOfOperation - */ - public abstract void operationFinished(Object ctx, T resultOfOperation); - - /** - * This method is called when the operation failed due to some reason. The - * reason for failure is passed in. - * - * @param ctx - * The context for the callback - * @param exception - * The reason for the failure of the scan - */ - public abstract void operationFailed(Object ctx, PubSubException exception); - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java deleted file mode 100644 index 8f0fda8..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/CallbackUtils.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.CompositeException; - -public class CallbackUtils { - - /** - * A callback that waits for all of a number of events to fire. If any fail, - * then fail the final callback with a composite exception. - * - * TODO: change this to use any Exception and make CompositeException - * generic, not a PubSubException. - * - * @param expected - * Number of expected callbacks. - * @param cb - * The final callback to call. - * @param ctx - * @param logger - * May be null. - * @param successMsg - * If not null, then this is logged on success. - * @param failureMsg - * If not null, then this is logged on failure. - * @param eagerErrorHandler - * If not null, then this will be executed after the first - * failure (but before the final failure callback). Useful for - * releasing resources, etc. as soon as we know the composite - * operation is doomed. - * @return the generated callback - */ - public static Callback<Void> multiCallback(final int expected, final Callback<Void> cb, final Object ctx, - final Logger logger, final String successMsg, final String failureMsg, - Runnable eagerErrorHandler) { - if (expected == 0) { - cb.operationFinished(ctx, null); - return null; - } else { - return new Callback<Void>() { - - final AtomicInteger done = new AtomicInteger(); - final LinkedBlockingQueue<PubSubException> exceptions = new LinkedBlockingQueue<PubSubException>(); - - private void tick() { - if (done.incrementAndGet() == expected) { - if (exceptions.isEmpty()) { - cb.operationFinished(ctx, null); - } else { - cb.operationFailed(ctx, new CompositeException(exceptions)); - } - } - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - if (logger != null && failureMsg != null) - logger.error(failureMsg, exception); - exceptions.add(exception); - tick(); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - if (logger != null && successMsg != null) - logger.info(successMsg); - tick(); - } - - }; - } - } - - /** - * A callback that waits for all of a number of events to fire. If any fail, - * then fail the final callback with a composite exception. - */ - public static Callback<Void> multiCallback(int expected, Callback<Void> cb, Object ctx) { - return multiCallback(expected, cb, ctx, null, null, null, null); - } - - /** - * A callback that waits for all of a number of events to fire. If any fail, - * then fail the final callback with a composite exception. - */ - public static Callback<Void> multinCallback(int expected, Callback<Void> cb, Object ctx, Runnable eagerErrorHandler) { - return multiCallback(expected, cb, ctx, null, null, null, eagerErrorHandler); - } - - private static Callback<Void> nop = new Callback<Void>() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - } - - }; - - /** - * A do-nothing callback. - */ - public static Callback<Void> nop() { - return nop; - } - - /** - * Logs what happened before continuing the callback chain. - */ - public static <T> Callback<T> logger(final Logger logger, final String successMsg, - final String failureMsg, final Callback<T> cont) { - return new Callback<T>() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - logger.error(failureMsg, exception); - if (cont != null) - cont.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(Object ctx, T resultOfOperation) { - logger.info(successMsg); - if (cont != null) - cont.operationFinished(ctx, resultOfOperation); - } - - }; - } - - /** - * Logs what happened (no continuation). - */ - public static Callback<Void> logger(Logger logger, String successMsg, String failureMsg) { - return logger(logger, successMsg, failureMsg, nop()); - } - - /** - * Return a Callback<Void> that just calls the given Callback cb with the - * bound result. - */ - public static <T> Callback<Void> curry(final Callback<T> cb, final T result) { - return new Callback<Void>() { - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - cb.operationFailed(ctx, exception); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - cb.operationFinished(ctx, result); - } - - }; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/ConcurrencyUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/ConcurrencyUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/util/ConcurrencyUtils.java deleted file mode 100644 index 8f5f1ca..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/ConcurrencyUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CyclicBarrier; - -public class ConcurrencyUtils { - - public static <T, U extends T, V extends BlockingQueue<T>> void put(V queue, U value) { - try { - queue.put(value); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public static <T> T take(BlockingQueue<T> queue) { - try { - return queue.take(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public static void await(CyclicBarrier barrier) { - try { - barrier.await(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/Either.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/Either.java b/hedwig-client/src/main/java/org/apache/hedwig/util/Either.java deleted file mode 100644 index b8ae82f..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/Either.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -public class Either<T, U> { - - private T x; - private U y; - - private Either(T x, U y) { - this.x = x; - this.y = y; - } - - public static <T, U> Either<T, U> of(T x, U y) { - return new Either<T, U>(x, y); - } - - public static <T, U> Either<T, U> left(T x) { - return new Either<T, U>(x, null); - } - - public static <T, U> Either<T, U> right(U y) { - return new Either<T, U>(null, y); - } - - public T left() { - return x; - } - - public U right() { - return y; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/FileUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/util/FileUtils.java deleted file mode 100644 index 7bab01b..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/FileUtils.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.io.File; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FileUtils { - - static DirDeleterThred dirDeleterThread; - static final Logger log = LoggerFactory.getLogger(FileUtils.class); - - static { - dirDeleterThread = new DirDeleterThred(); - Runtime.getRuntime().addShutdownHook(dirDeleterThread); - } - - public static File createTempDirectory(String prefix) throws IOException { - return createTempDirectory(prefix, null); - } - - public static File createTempDirectory(String prefix, String suffix) throws IOException { - File tempDir = File.createTempFile(prefix, suffix); - if (!tempDir.delete()) { - throw new IOException("Could not delete temp file: " + tempDir.getAbsolutePath()); - } - - if (!tempDir.mkdir()) { - throw new IOException("Could not create temp directory: " + tempDir.getAbsolutePath()); - } - - dirDeleterThread.addDirToDelete(tempDir); - return tempDir; - - } - - static class DirDeleterThred extends Thread { - List<File> dirsToDelete = new LinkedList<File>(); - - public synchronized void addDirToDelete(File dir) { - dirsToDelete.add(dir); - } - - @Override - public void run() { - synchronized (this) { - for (File dir : dirsToDelete) { - deleteDirectory(dir); - } - } - } - - protected void deleteDirectory(File dir) { - if (dir.isFile()) { - if (!dir.delete()) { - log.error("Could not delete " + dir.getAbsolutePath()); - } - return; - } - - File[] files = dir.listFiles(); - if (files == null) { - return; - } - - for (File f : files) { - deleteDirectory(f); - } - - if (!dir.delete()) { - log.error("Could not delete directory: " + dir.getAbsolutePath()); - } - - } - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java b/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java deleted file mode 100644 index 8bfdada..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/HedwigSocketAddress.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.net.InetSocketAddress; - -/** - * This is a data wrapper class that is basically an InetSocketAddress with one - * extra piece of information for the SSL port (optional). This is used by - * Hedwig so we can encapsulate both regular and SSL port information in one - * data structure. Hedwig hub servers can be configured to listen on the - * standard regular port and additionally on an optional SSL port. The String - * representation of a HedwigSocketAddress is: <hostname>:<port>:<SSL - * port(optional)> - */ -public class HedwigSocketAddress { - - // Member fields that make up this class. - private final String hostname; - private final int port; - private final int sslPort; - - private final InetSocketAddress socketAddress; - private final InetSocketAddress sslSocketAddress; - - // Constants used by this class. - public static final String COLON = ":"; - private static final int NO_SSL_PORT = -1; - - // Constructor that takes in both a regular and SSL port. - public HedwigSocketAddress(String hostname, int port, int sslPort) { - this.hostname = hostname; - this.port = port; - this.sslPort = sslPort; - socketAddress = new InetSocketAddress(hostname, port); - if (sslPort != NO_SSL_PORT) - sslSocketAddress = new InetSocketAddress(hostname, sslPort); - else - sslSocketAddress = null; - } - - // Constructor that only takes in a regular port. - public HedwigSocketAddress(String hostname, int port) { - this(hostname, port, NO_SSL_PORT); - } - - // Constructor from a String "serialized" version of this class. - public HedwigSocketAddress(String addr) { - String[] parts = addr.split(COLON); - this.hostname = parts[0]; - this.port = Integer.parseInt(parts[1]); - if (parts.length > 2) - this.sslPort = Integer.parseInt(parts[2]); - else - this.sslPort = NO_SSL_PORT; - socketAddress = new InetSocketAddress(hostname, port); - if (sslPort != NO_SSL_PORT) - sslSocketAddress = new InetSocketAddress(hostname, sslPort); - else - sslSocketAddress = null; - } - - // Public getters - public String getHostname() { - return hostname; - } - - public int getPort() { - return port; - } - - public int getSSLPort() { - return sslPort; - } - - // Method to return an InetSocketAddress for the regular port. - public InetSocketAddress getSocketAddress() { - return socketAddress; - } - - // Method to return an InetSocketAddress for the SSL port. - // Note that if no SSL port (or an invalid value) was passed - // during object creation, this call will throw an IllegalArgumentException - // (runtime exception). - public InetSocketAddress getSSLSocketAddress() { - return sslSocketAddress; - } - - // Method to determine if this object instance is SSL enabled or not - // (contains a valid SSL port). - public boolean isSSLEnabled() { - return sslPort != NO_SSL_PORT; - } - - // Return the String "serialized" version of this object. - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(hostname).append(COLON).append(port).append(COLON).append(sslPort); - return sb.toString(); - } - - // Implement an equals method comparing two HedwigSocketAddress objects. - @Override - public boolean equals(Object obj) { - if (!(obj instanceof HedwigSocketAddress)) - return false; - HedwigSocketAddress that = (HedwigSocketAddress) obj; - return (this.hostname.equals(that.hostname) && (this.port == that.port) && (this.sslPort == that.sslPort)); - } - - @Override - public int hashCode() { - return (this.hostname + this.port + this.sslPort).hashCode(); - } - - // Static helper method to return the string representation for an - // InetSocketAddress. The HedwigClient can only operate in SSL or non-SSL - // mode. So the server hosts it connects to will just be an - // InetSocketAddress instead of a HedwigSocketAddress. This utility method - // can be used so we can store these server hosts as strings (ByteStrings) - // in various places (e.g. list of server hosts we've connected to - // or wrote to unsuccessfully). - public static String sockAddrStr(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/Option.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/Option.java b/hedwig-client/src/main/java/org/apache/hedwig/util/Option.java deleted file mode 100644 index 6a34782..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/Option.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -public class Option<T> { - - private T x; - - public static <T> Option<T> of(T x) { - return new Option<T>(x); - } - - public static <T> Option<T> of() { - return new Option<T>(); - } - - public Option() { - } - - public Option(T x) { - this.x = x; - } - - public T get() { - return x; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/Pair.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/Pair.java b/hedwig-client/src/main/java/org/apache/hedwig/util/Pair.java deleted file mode 100644 index f0582b5..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/Pair.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -public class Pair<T, U> { - - private T x; - private U y; - - public Pair(T x, U y) { - this.x = x; - this.y = y; - } - - public static <T, U> Pair<T, U> of(T x, U y) { - return new Pair<T, U>(x, y); - } - - public T first() { - return x; - } - - public U second() { - return y; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java deleted file mode 100644 index 269286c..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/PathUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -public class PathUtils { - - /** Generate all prefixes for a path. "/a/b/c" -> ["/a","/a/b","/a/b/c"] */ - public static List<String> prefixes(String path) { - List<String> prefixes = new ArrayList<String>(); - StringBuilder prefix = new StringBuilder(); - for (String comp : path.split("/+")) { - // Skip the first (empty) path component. - if (!comp.equals("")) { - prefix.append("/").append(comp); - prefixes.add(prefix.toString()); - } - } - return prefixes; - } - - /** Return true iff prefix is a prefix of path. */ - public static boolean isPrefix(String prefix, String path) { - String[] as = prefix.split("/+"), bs = path.split("/+"); - if (as.length > bs.length) - return false; - for (int i = 0; i < as.length; i++) - if (!as[i].equals(bs[i])) - return false; - return true; - } - - /** Like File.getParent but always uses the / separator. */ - public static String parent(String path) { - return new File(path).getParent().replace("\\", "/"); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java b/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java deleted file mode 100644 index 5085196..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/SubscriptionListener.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; - -/** - * This class is used for subscriber to listen on subscription event. - */ -public interface SubscriptionListener { - - /** - * Process an event from a subscription. - * <p> - * NOTE: It would be better to not run blocking operations in a - * listener implementation. - * </p> - * - * @param topic - * Topic Name - * @param subscriberId - * Subscriber Id - * @param event - * Event tell what happened to the subscription. - */ - public void processEvent(ByteString topic, ByteString subscriberId, - SubscriptionEvent event); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java b/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java deleted file mode 100644 index b8d22da..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/util/VarArgs.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -public class VarArgs { - - public static Object[] va(Object...args) { - return args; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/test/java/org/apache/hedwig/util/TestFileUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/test/java/org/apache/hedwig/util/TestFileUtils.java b/hedwig-client/src/test/java/org/apache/hedwig/util/TestFileUtils.java deleted file mode 100644 index 53e99b3..0000000 --- a/hedwig-client/src/test/java/org/apache/hedwig/util/TestFileUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.io.File; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestFileUtils { - - @Test(timeout=60000) - public void testCreateTmpDirectory() throws Exception { - String prefix = "abc"; - String suffix = "def"; - File dir = FileUtils.createTempDirectory(prefix, suffix); - assertTrue(dir.isDirectory()); - assertTrue(dir.getName().startsWith(prefix)); - assertTrue(dir.getName().endsWith(suffix)); - FileUtils.dirDeleterThread.start(); - FileUtils.dirDeleterThread.join(); - assertFalse(dir.exists()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java b/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java deleted file mode 100644 index b6bb78a..0000000 --- a/hedwig-client/src/test/java/org/apache/hedwig/util/TestHedwigSocketAddress.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.net.InetSocketAddress; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestHedwigSocketAddress { - - // Common values used by tests - private String hostname = "localhost"; - private int port = 4080; - private int sslPort = 9876; - private int invalidPort = -9999; - private String COLON = ":"; - - @Test(timeout=60000) - public void testCreateWithSSLPort() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port, sslPort); - assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port))); - assertTrue(addr.getSSLSocketAddress().equals(new InetSocketAddress(hostname, sslPort))); - } - - @Test(timeout=60000) - public void testCreateWithNoSSLPort() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port); - assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port))); - assertTrue(addr.getSSLSocketAddress() == null); - } - - @Test(timeout=60000) - public void testCreateFromStringWithSSLPort() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress(hostname+COLON+port+COLON+sslPort); - assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port))); - assertTrue(addr.getSSLSocketAddress().equals(new InetSocketAddress(hostname, sslPort))); - } - - @Test(timeout=60000) - public void testCreateFromStringWithNoSSLPort() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress(hostname+COLON+port); - assertTrue(addr.getSocketAddress().equals(new InetSocketAddress(hostname, port))); - assertTrue(addr.getSSLSocketAddress() == null); - } - - @Test(timeout=60000) - public void testCreateWithInvalidRegularPort() throws Exception { - boolean success = false; - try { - new HedwigSocketAddress(hostname+COLON+invalidPort); - } - catch (IllegalArgumentException e) { - success = true; - } - assertTrue(success); - } - - @Test(timeout=60000) - public void testCreateWithInvalidSSLPort() throws Exception { - boolean success = false; - try { - new HedwigSocketAddress(hostname, port, invalidPort); - } - catch (IllegalArgumentException e) { - success = true; - } - assertTrue(success); - } - - @Test(timeout=60000) - public void testToStringConversion() throws Exception { - HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port, sslPort); - HedwigSocketAddress addr2 = new HedwigSocketAddress(addr.toString()); - assertTrue(addr.getSocketAddress().equals(addr2.getSocketAddress())); - assertTrue(addr.getSSLSocketAddress().equals(addr2.getSSLSocketAddress())); - addr.toString().equals(addr2.toString()); - } - - @Test(timeout=60000) - public void testIsSSLEnabledFlag() throws Exception { - HedwigSocketAddress sslAddr = new HedwigSocketAddress(hostname, port, sslPort); - assertTrue(sslAddr.isSSLEnabled()); - HedwigSocketAddress addr = new HedwigSocketAddress(hostname, port); - assertFalse(addr.isSSLEnabled()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/test/java/org/apache/hedwig/util/TestPathUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/test/java/org/apache/hedwig/util/TestPathUtils.java b/hedwig-client/src/test/java/org/apache/hedwig/util/TestPathUtils.java deleted file mode 100644 index a596841..0000000 --- a/hedwig-client/src/test/java/org/apache/hedwig/util/TestPathUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.util; - -import java.util.Arrays; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestPathUtils { - - @Test(timeout=60000) - public void testPrefixes() { - assertEquals(Arrays.asList(new String[] { "/a", "/a/b", "/a/b/c" }), PathUtils.prefixes("/a/b/c")); - assertEquals(Arrays.asList(new String[] { "/a", "/a/b", "/a/b/c" }), PathUtils.prefixes("///a///b///c")); - - } - - @Test(timeout=60000) - public void testIsPrefix() { - String[] paths = new String[] { "/", "/a", "/a/b" }; - for (int i = 0; i < paths.length; i++) { - for (int j = 0; j <= i; j++) { - assertTrue(PathUtils.isPrefix(paths[j], paths[i])); - assertTrue(PathUtils.isPrefix(paths[j], paths[i] + "/")); - assertTrue(PathUtils.isPrefix(paths[j] + "/", paths[i])); - assertTrue(PathUtils.isPrefix(paths[j] + "/", paths[i] + "/")); - } - for (int j = i + 1; j < paths.length; j++) { - assertFalse(PathUtils.isPrefix(paths[j], paths[i])); - assertFalse(PathUtils.isPrefix(paths[j], paths[i] + "/")); - assertFalse(PathUtils.isPrefix(paths[j] + "/", paths[i])); - assertFalse(PathUtils.isPrefix(paths[j] + "/", paths[i] + "/")); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/hedwig-protocol/pom.xml b/hedwig-protocol/pom.xml deleted file mode 100644 index c0609ae..0000000 --- a/hedwig-protocol/pom.xml +++ /dev/null @@ -1,116 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>bookkeeper</artifactId> - <version>4.4.0-SNAPSHOT</version> - </parent> - <artifactId>hedwig-protocol</artifactId> - <packaging>jar</packaging> - <name>hedwig-protocol</name> - <url>http://maven.apache.org</url> - <dependencies> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.8.1</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.6.4</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.6.4</version> - </dependency> - </dependencies> - <repositories> - </repositories> - <build> - <defaultGoal>install</defaultGoal> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.2.1</version> - <configuration> - <skipAssembly>true</skipAssembly> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <version>0.7</version> - <configuration> - <excludes> - <!-- exclude generated file //--> - <exclude>**/PubSubProtocol.java</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> - </configuration> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <id>protobuf</id> - <build> - <plugins> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>generate-sources</phase> - <id>default-cli</id> - <configuration> - <target> - <exec executable="protoc" failonerror="true"> - <arg value="--java_out=src/main/java" /> - <arg value="src/main/protobuf/PubSubProtocol.proto" /> - </exec> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> -</project> http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java ---------------------------------------------------------------------- diff --git a/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java b/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java deleted file mode 100644 index 2e8dc09..0000000 --- a/hedwig-protocol/src/main/java/org/apache/hedwig/exceptions/PubSubException.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.exceptions; - -import java.util.Collection; -import java.util.Iterator; - -import org.apache.hedwig.protocol.PubSubProtocol.StatusCode; - -@SuppressWarnings("serial") -public abstract class PubSubException extends Exception { - protected StatusCode code; - - protected PubSubException(StatusCode code, String msg) { - super(msg); - this.code = code; - } - - protected PubSubException(StatusCode code, Throwable t) { - super(t); - this.code = code; - } - - protected PubSubException(StatusCode code, String msg, Throwable t) { - super(msg, t); - this.code = code; - } - - public static PubSubException create(StatusCode code, String msg) { - if (code == StatusCode.CLIENT_ALREADY_SUBSCRIBED) { - return new ClientAlreadySubscribedException(msg); - } else if (code == StatusCode.CLIENT_NOT_SUBSCRIBED) { - return new ClientNotSubscribedException(msg); - } else if (code == StatusCode.MALFORMED_REQUEST) { - return new MalformedRequestException(msg); - } else if (code == StatusCode.NO_SUCH_TOPIC) { - return new NoSuchTopicException(msg); - } else if (code == StatusCode.NOT_RESPONSIBLE_FOR_TOPIC) { - return new ServerNotResponsibleForTopicException(msg); - } else if (code == StatusCode.SERVICE_DOWN) { - return new ServiceDownException(msg); - } else if (code == StatusCode.COULD_NOT_CONNECT) { - return new CouldNotConnectException(msg); - } else if (code == StatusCode.TOPIC_BUSY) { - return new TopicBusyException(msg); - } else if (code == StatusCode.BAD_VERSION) { - return new BadVersionException(msg); - } else if (code == StatusCode.NO_TOPIC_PERSISTENCE_INFO) { - return new NoTopicPersistenceInfoException(msg); - } else if (code == StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS) { - return new TopicPersistenceInfoExistsException(msg); - } else if (code == StatusCode.NO_SUBSCRIPTION_STATE) { - return new NoSubscriptionStateException(msg); - } else if (code == StatusCode.SUBSCRIPTION_STATE_EXISTS) { - return new SubscriptionStateExistsException(msg); - } else if (code == StatusCode.NO_TOPIC_OWNER_INFO) { - return new NoTopicOwnerInfoException(msg); - } else if (code == StatusCode.TOPIC_OWNER_INFO_EXISTS) { - return new TopicOwnerInfoExistsException(msg); - } else if (code == StatusCode.INVALID_MESSAGE_FILTER) { - return new InvalidMessageFilterException(msg); - } else if (code == StatusCode.RESUBSCRIBE_EXCEPTION) { - return new ResubscribeException(msg); - } - /* - * Insert new ones here - */ - else if (code == StatusCode.UNCERTAIN_STATE) { - return new UncertainStateException(msg); - } - // Finally the catch all exception (for unexpected error conditions) - else { - return new UnexpectedConditionException("Unknow status code:" + code.getNumber() + ", msg: " + msg); - } - } - - public StatusCode getCode() { - return code; - } - - public static class ClientAlreadySubscribedException extends PubSubException { - public ClientAlreadySubscribedException(String msg) { - super(StatusCode.CLIENT_ALREADY_SUBSCRIBED, msg); - } - } - - public static class ClientNotSubscribedException extends PubSubException { - public ClientNotSubscribedException(String msg) { - super(StatusCode.CLIENT_NOT_SUBSCRIBED, msg); - } - } - - public static class ResubscribeException extends PubSubException { - public ResubscribeException(String msg) { - super(StatusCode.RESUBSCRIBE_EXCEPTION, msg); - } - } - - public static class MalformedRequestException extends PubSubException { - public MalformedRequestException(String msg) { - super(StatusCode.MALFORMED_REQUEST, msg); - } - } - - public static class NoSuchTopicException extends PubSubException { - public NoSuchTopicException(String msg) { - super(StatusCode.NO_SUCH_TOPIC, msg); - } - } - - public static class ServerNotResponsibleForTopicException extends PubSubException { - // Note the exception message serves as the name of the responsible host - public ServerNotResponsibleForTopicException(String responsibleHost) { - super(StatusCode.NOT_RESPONSIBLE_FOR_TOPIC, responsibleHost); - } - } - - public static class TopicBusyException extends PubSubException { - public TopicBusyException(String msg) { - super(StatusCode.TOPIC_BUSY, msg); - } - } - - public static class ServiceDownException extends PubSubException { - public ServiceDownException(String msg) { - super(StatusCode.SERVICE_DOWN, msg); - } - - public ServiceDownException(Exception e) { - super(StatusCode.SERVICE_DOWN, e); - } - - public ServiceDownException(String msg, Throwable t) { - super(StatusCode.SERVICE_DOWN, msg, t); - } - } - - public static class CouldNotConnectException extends PubSubException { - public CouldNotConnectException(String msg) { - super(StatusCode.COULD_NOT_CONNECT, msg); - } - } - - public static class BadVersionException extends PubSubException { - public BadVersionException(String msg) { - super(StatusCode.BAD_VERSION, msg); - } - } - - public static class NoTopicPersistenceInfoException extends PubSubException { - public NoTopicPersistenceInfoException(String msg) { - super(StatusCode.NO_TOPIC_PERSISTENCE_INFO, msg); - } - } - - public static class TopicPersistenceInfoExistsException extends PubSubException { - public TopicPersistenceInfoExistsException(String msg) { - super(StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, msg); - } - } - - public static class NoSubscriptionStateException extends PubSubException { - public NoSubscriptionStateException(String msg) { - super(StatusCode.NO_SUBSCRIPTION_STATE, msg); - } - } - - public static class SubscriptionStateExistsException extends PubSubException { - public SubscriptionStateExistsException(String msg) { - super(StatusCode.SUBSCRIPTION_STATE_EXISTS, msg); - } - } - - public static class NoTopicOwnerInfoException extends PubSubException { - public NoTopicOwnerInfoException(String msg) { - super(StatusCode.NO_TOPIC_OWNER_INFO, msg); - } - } - - public static class TopicOwnerInfoExistsException extends PubSubException { - public TopicOwnerInfoExistsException(String msg) { - super(StatusCode.TOPIC_OWNER_INFO_EXISTS, msg); - } - } - - public static class InvalidMessageFilterException extends PubSubException { - public InvalidMessageFilterException(String msg) { - super(StatusCode.INVALID_MESSAGE_FILTER, msg); - } - - public InvalidMessageFilterException(String msg, Throwable t) { - super(StatusCode.INVALID_MESSAGE_FILTER, msg, t); - } - } - - public static class UncertainStateException extends PubSubException { - public UncertainStateException(String msg) { - super(StatusCode.UNCERTAIN_STATE, msg); - } - } - - // The catch all exception (for unexpected error conditions) - public static class UnexpectedConditionException extends PubSubException { - public UnexpectedConditionException(String msg) { - super(StatusCode.UNEXPECTED_CONDITION, msg); - } - public UnexpectedConditionException(String msg, Throwable t) { - super(StatusCode.UNEXPECTED_CONDITION, msg, t); - } - } - - // The composite exception (for concurrent operations). - public static class CompositeException extends PubSubException { - private final Collection<PubSubException> exceptions; - public CompositeException(Collection<PubSubException> exceptions) { - super(StatusCode.COMPOSITE, compositeMessage(exceptions)); - this.exceptions = exceptions; - } - - public Collection<PubSubException> getExceptions() { - return exceptions; - } - - /** Merges the message fields of the given Exceptions into a one line string. */ - private static String compositeMessage(Collection<PubSubException> exceptions) { - StringBuilder builder = new StringBuilder("Composite exception: ["); - Iterator<PubSubException> iter = exceptions.iterator(); - if (iter.hasNext()) - builder.append(iter.next().getMessage()); - while (iter.hasNext()) - builder.append(" :: ").append(iter.next().getMessage()); - return builder.append("]").toString(); - } - } - - public static class ClientNotSubscribedRuntimeException extends RuntimeException { - } - -}