http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java deleted file mode 100644 index 38dfc3c..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.benchmark; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback; -import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputAggregator; -import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.util.Callback; - -public class BenchmarkSubscriber extends BenchmarkWorker implements Callable<Void> { - private static final Logger logger = LoggerFactory.getLogger(BenchmarkSubscriber.class); - Subscriber subscriber; - ByteString subId; - - - public BenchmarkSubscriber(int numTopics, int numMessages, int numRegions, - int startTopicLabel, int partitionIndex, int numPartitions, Subscriber subscriber, ByteString subId) { - super(numTopics, numMessages, numRegions, startTopicLabel, partitionIndex, numPartitions); - this.subscriber = subscriber; - this.subId = subId; - } - - public void warmup(int numWarmup) throws InterruptedException { - /* - * multiplying the number of ops by numParitions because we end up - * skipping many because of the partitioning logic - */ - multiSub("warmup", "warmup", 0, numWarmup, numWarmup * numPartitions); - } - - public Void call() throws Exception { - - final ThroughputAggregator agg = new ThroughputAggregator("recvs", numMessages); - agg.startProgress(); - - final Map<String, Long> lastSeqIdSeenMap = new HashMap<String, Long>(); - - for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) { - - if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) { - continue; - } - - final String topic = HedwigBenchmark.TOPIC_PREFIX + i; - - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, opts); - subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, new MessageHandler() { - - @Override - public void deliver(ByteString thisTopic, ByteString subscriberId, Message msg, - Callback<Void> callback, Object context) { - logger.debug("Got message from src-region: {} with seq-id: {}", - msg.getSrcRegion(), msg.getMsgId()); - - String mapKey = topic + msg.getSrcRegion().toStringUtf8(); - Long lastSeqIdSeen = lastSeqIdSeenMap.get(mapKey); - if (lastSeqIdSeen == null) { - lastSeqIdSeen = (long) 0; - } - - if (getSrcSeqId(msg) <= lastSeqIdSeen) { - logger.info("Redelivery of message, src-region: " + msg.getSrcRegion() + "seq-id: " - + msg.getMsgId()); - } else { - agg.ding(false); - } - - callback.operationFinished(context, null); - } - }); - } - System.out.println("Finished subscribing to topics and now waiting for messages to come in..."); - // Wait till the benchmark test has completed - agg.queue.take(); - System.out.println(agg.summarize(agg.earliest.get())); - return null; - } - - long getSrcSeqId(Message msg) { - if (msg.getMsgId().getRemoteComponentsCount() == 0) { - return msg.getMsgId().getLocalComponent(); - } - - for (RegionSpecificSeqId rseqId : msg.getMsgId().getRemoteComponentsList()) { - if (rseqId.getRegion().equals(msg.getSrcRegion())) - return rseqId.getSeqId(); - } - - return msg.getMsgId().getLocalComponent(); - } - - void multiSub(String label, String topicPrefix, int start, final int npar, final int count) - throws InterruptedException { - long startTime = MathUtils.now(); - ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar); - agg.startProgress(); - - int end = start + count; - for (int i = start; i < end; ++i) { - if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, numPartitions)) { - continue; - } - SubscriptionOptions opts = SubscriptionOptions.newBuilder() - .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build(); - subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i), - subId, opts, - new BenchmarkCallback(agg), null); - } - // Wait till the benchmark test has completed - agg.tpAgg.queue.take(); - if (count > 1) - System.out.println(agg.summarize(startTime)); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java deleted file mode 100644 index 3efe22d..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.benchmark; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.bookkeeper.util.MathUtils; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.Callback; - -public class BenchmarkUtils { - static final Logger logger = LoggerFactory.getLogger(BenchmarkUtils.class); - - public static double calcTp(final int count, long startTime) { - return 1000. * count / (MathUtils.now() - startTime); - } - - /** - * Stats aggregator for callback (round-trip) operations. Measures both - * throughput and latency. - */ - public static class ThroughputLatencyAggregator { - int numBuckets; - final ThroughputAggregator tpAgg; - final Semaphore outstanding; - final AtomicLong sum = new AtomicLong(); - - final AtomicLong[] latencyBuckets; - - // bucket[i] is count of number of operations that took >= i ms and < - // (i+1) ms. - - public ThroughputLatencyAggregator(String label, int count, int limit) throws InterruptedException { - numBuckets = Integer.getInteger("numBuckets", 101); - latencyBuckets = new AtomicLong[numBuckets]; - tpAgg = new ThroughputAggregator(label, count); - outstanding = new Semaphore(limit); - for (int i = 0; i < numBuckets; i++) { - latencyBuckets[i] = new AtomicLong(); - } - } - - public void startProgress() { - tpAgg.startProgress(); - } - - public void reportLatency(long latency) { - sum.addAndGet(latency); - - int bucketIndex; - if (latency >= numBuckets) { - bucketIndex = (int) numBuckets - 1; - } else { - bucketIndex = (int) latency; - } - latencyBuckets[bucketIndex].incrementAndGet(); - } - - private String getPercentile(double percentile) { - int numInliersNeeded = (int) (percentile / 100 * tpAgg.count); - int numInliersFound = 0; - for (int i = 0; i < numBuckets - 1; i++) { - numInliersFound += latencyBuckets[i].intValue(); - if (numInliersFound > numInliersNeeded) { - return i + ""; - } - } - return " >= " + (numBuckets - 1); - } - - public String summarize(long startTime) { - double percentile = Double.parseDouble(System.getProperty("percentile", "99.9")); - return tpAgg.summarize(startTime) + ", avg latency = " + sum.get() / tpAgg.count + ", " + percentile - + "%ile latency = " + getPercentile(percentile); - } - } - - /** - * Stats aggregator for non-callback (single-shot) operations. Measures just - * throughput. - */ - public static class ThroughputAggregator { - final String label; - final int count; - final AtomicInteger done = new AtomicInteger(); - final AtomicLong earliest = new AtomicLong(); - final AtomicInteger numFailed = new AtomicInteger(); - final Thread progressThread; - final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); - - public ThroughputAggregator(final String label, final int count) { - this.label = label; - this.count = count; - if (count == 0) - queue.add(0); - if (Boolean.getBoolean("progress")) { - progressThread = new Thread(new Runnable() { - @Override - public void run() { - try { - for (int doneSnap = 0, prev = 0; doneSnap < count; prev = doneSnap, doneSnap = done.get()) { - if (doneSnap > prev) { - System.out.println(label + " progress: " + doneSnap + " of " + count); - } - Thread.sleep(1000); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - }); - } else { - progressThread = null; - } - } - - public void startProgress() { - if (progressThread != null) { - progressThread.start(); - } - } - - public void ding(boolean failed) { - int snapDone = done.incrementAndGet(); - earliest.compareAndSet(0, MathUtils.now()); - if (failed) - numFailed.incrementAndGet(); - if (logger.isDebugEnabled()) - logger.debug(label + " " + (failed ? "failed" : "succeeded") + ", done so far = " + snapDone); - if (snapDone == count) { - queue.add(numFailed.get()); - } - } - - public String summarize(long startTime) { - return "Finished " + label + ": count = " + done.get() + ", tput = " + calcTp(count, startTime) - + " ops/s, numFailed = " + numFailed; - } - } - - public static class BenchmarkCallback implements Callback<Void> { - - final ThroughputLatencyAggregator agg; - final long startTime; - - public BenchmarkCallback(ThroughputLatencyAggregator agg) throws InterruptedException { - this.agg = agg; - agg.outstanding.acquire(); - // Must set the start time *after* taking acquiring on outstanding. - startTime = MathUtils.now(); - } - - private void finish(boolean failed) { - agg.reportLatency(MathUtils.now() - startTime); - agg.tpAgg.ding(failed); - agg.outstanding.release(); - } - - @Override - public void operationFinished(Object ctx, Void resultOfOperation) { - finish(false); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - finish(true); - } - }; - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java deleted file mode 100644 index e7b15f2..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.benchmark; - -public class BenchmarkWorker { - int numTopics; - int numMessages; - int numRegions; - int startTopicLabel; - int partitionIndex; - int numPartitions; - - public BenchmarkWorker(int numTopics, int numMessages, int numRegions, - int startTopicLabel, int partitionIndex, int numPartitions) { - this.numTopics = numTopics; - this.numMessages = numMessages; - this.numRegions = numRegions; - this.startTopicLabel = startTopicLabel; - this.partitionIndex = partitionIndex; - this.numPartitions = numPartitions; - - if (numMessages % (numTopics * numRegions) != 0) { - throw new RuntimeException("Number of messages not equally divisible among regions and topics"); - } - - if (numTopics % numPartitions != 0) { - throw new RuntimeException("Number of topics not equally divisible among partitions"); - } - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java deleted file mode 100644 index cc5e937..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.benchmark; - -import java.io.File; -import java.util.concurrent.Callable; - -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.logging.Log4JLoggerFactory; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.util.HedwigSocketAddress; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.HedwigClient; -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.api.Subscriber; - -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.cli.ParseException; - -public class HedwigBenchmark implements Callable<Void> { - protected static final Logger logger = LoggerFactory.getLogger(HedwigBenchmark.class); - - static final String TOPIC_PREFIX = "topic"; - - private final HedwigClient client; - private final Publisher publisher; - private final Subscriber subscriber; - private final CommandLine cmd; - - public HedwigBenchmark(ClientConfiguration cfg, CommandLine cmd) { - client = new HedwigClient(cfg); - publisher = client.getPublisher(); - subscriber = client.getSubscriber(); - this.cmd = cmd; - } - - static boolean amIResponsibleForTopic(int topicNum, int partitionIndex, int numPartitions) { - return topicNum % numPartitions == partitionIndex; - } - - @Override - public Void call() throws Exception { - - // - // Parameters. - // - - // What program to run: pub, sub (subscription benchmark), recv. - final String mode = cmd.getOptionValue("mode",""); - - // Number of requests to make (publishes or subscribes). - int numTopics = Integer.valueOf(cmd.getOptionValue("nTopics", "50")); - int numMessages = Integer.valueOf(cmd.getOptionValue("nMsgs", "1000")); - int numRegions = Integer.valueOf(cmd.getOptionValue("nRegions", "1")); - int startTopicLabel = Integer.valueOf(cmd.getOptionValue("startTopicLabel", "0")); - int partitionIndex = Integer.valueOf(cmd.getOptionValue("partitionIndex", "0")); - int numPartitions = Integer.valueOf(cmd.getOptionValue("nPartitions", "1")); - - int replicaIndex = Integer.valueOf(cmd.getOptionValue("replicaIndex", "0")); - - int rate = Integer.valueOf(cmd.getOptionValue("rate", "0")); - int nParallel = Integer.valueOf(cmd.getOptionValue("npar", "100")); - int msgSize = Integer.valueOf(cmd.getOptionValue("msgSize", "1024")); - - // Number of warmup subscriptions to make. - final int nWarmups = Integer.valueOf(cmd.getOptionValue("nwarmups", "1000")); - - if (mode.equals("sub")) { - BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, 0, 1, startTopicLabel, 0, 1, - subscriber, ByteString.copyFromUtf8("mySub")); - - benchmarkSub.warmup(nWarmups); - benchmarkSub.call(); - - } else if (mode.equals("recv")) { - - BenchmarkSubscriber benchmarkSub = new BenchmarkSubscriber(numTopics, numMessages, numRegions, - startTopicLabel, partitionIndex, numPartitions, subscriber, ByteString.copyFromUtf8("sub-" - + replicaIndex)); - - benchmarkSub.call(); - - } else if (mode.equals("pub")) { - // Offered load in msgs/second. - BenchmarkPublisher benchmarkPub = new BenchmarkPublisher(numTopics, numMessages, numRegions, - startTopicLabel, partitionIndex, numPartitions, publisher, subscriber, msgSize, nParallel, rate); - benchmarkPub.warmup(nWarmups); - benchmarkPub.call(); - - } else { - throw new Exception("unknown mode: " + mode); - } - - return null; - } - - public static void main(String[] args) throws Exception { - Options options = new Options(); - options.addOption("mode", true, "sub, recv, or pub"); - options.addOption("nTopics", true, "Number of topics, default 50"); - options.addOption("nMsgs", true, "Number of messages, default 1000"); - options.addOption("nRegions", true, "Number of regsions, default 1"); - options.addOption("startTopicLabel", true, - "Prefix of topic labels. Must be numeric. Default 0"); - options.addOption("partitionIndex", true, "If partitioning, the partition index for this client"); - options.addOption("nPartitions", true, "Number of partitions, default 1"); - options.addOption("replicaIndex", true, "default 0"); - options.addOption("rate", true, "default 0"); - options.addOption("npar", true, "default 100"); - options.addOption("msgSize", true, "Size of messages, default 1024"); - options.addOption("nwarmups", true, "Number of warmup messages, default 1000"); - options.addOption("defaultHub", true, "Default hedwig hub to connect to, default localhost:4080"); - - CommandLineParser parser = new PosixParser(); - final CommandLine cmd = parser.parse(options, args); - - if (cmd.hasOption("help")) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("HedwigBenchmark <options>", options); - System.exit(-1); - } - - ClientConfiguration cfg = new ClientConfiguration() { - public HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - return new HedwigSocketAddress(cmd.getOptionValue("defaultHub", - "localhost:4080")); - } - - public boolean isSSLEnabled() { - return false; - } - }; - - InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory()); - - HedwigBenchmark app = new HedwigBenchmark(cfg, cmd); - app.call(); - System.exit(0); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java b/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java deleted file mode 100644 index 836a9d2..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.conf; - -import java.net.InetSocketAddress; - -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hedwig.conf.AbstractConfiguration; -import org.apache.hedwig.util.HedwigSocketAddress; - -public class ClientConfiguration extends AbstractConfiguration { - private static final Logger logger = LoggerFactory.getLogger(ClientConfiguration.class); - - // Protected member variables for configuration parameter names - protected static final String DEFAULT_SERVER_HOST = "default_server_host"; - protected static final String MAX_MESSAGE_SIZE = "max_message_size"; - protected static final String MAX_SERVER_REDIRECTS = "max_server_redirects"; - protected static final String AUTO_SEND_CONSUME_MESSAGE_ENABLED = "auto_send_consume_message_enabled"; - protected static final String CONSUMED_MESSAGES_BUFFER_SIZE = "consumed_messages_buffer_size"; - protected static final String MESSAGE_CONSUME_RETRY_WAIT_TIME = "message_consume_retry_wait_time"; - protected static final String SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME = "subscribe_reconnect_retry_wait_time"; - protected static final String MAX_OUTSTANDING_MESSAGES = "max_outstanding_messages"; - protected static final String SERVER_ACK_RESPONSE_TIMEOUT = "server_ack_response_timeout"; - protected static final String TIMEOUT_THREAD_RUN_INTERVAL = "timeout_thread_run_interval"; - protected static final String SSL_ENABLED = "ssl_enabled"; - protected static final String SUBSCRIPTION_MESSAGE_BOUND = "subscription_message_bound"; - protected static final String SUBSCRIPTION_CHANNEL_SHARING_ENABLED = "subscription_channel_sharing_enabled"; - - // Singletons we want to instantiate only once per ClientConfiguration - protected HedwigSocketAddress myDefaultServerAddress = null; - - // Getters for the various Client Configuration parameters. - // This should point to the default server host, or the VIP fronting all of - // the server hubs. This will return the HedwigSocketAddress which - // encapsulates both the regular and SSL port connection to the server host. - protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() { - if (myDefaultServerAddress == null) - myDefaultServerAddress = new HedwigSocketAddress(conf.getString(DEFAULT_SERVER_HOST, "localhost:4080:9876")); - return myDefaultServerAddress; - } - - // This will get the default server InetSocketAddress based on if SSL is - // enabled or not. - public InetSocketAddress getDefaultServerHost() { - if (isSSLEnabled()) - return getDefaultServerHedwigSocketAddress().getSSLSocketAddress(); - else - return getDefaultServerHedwigSocketAddress().getSocketAddress(); - } - - public int getMaximumMessageSize() { - return conf.getInt(MAX_MESSAGE_SIZE, 2 * 1024 * 1024); - } - - // This parameter is for setting the maximum number of server redirects to - // allow before we consider it as an error condition. This is to stop - // infinite redirect loops in case there is a problem with the hub servers - // topic mastership. - public int getMaximumServerRedirects() { - return conf.getInt(MAX_SERVER_REDIRECTS, 2); - } - - // This parameter is a boolean flag indicating if the client library should - // automatically send the consume message to the server based on the - // configured amount of messages consumed by the client app. The client app - // could choose to override this behavior and instead, manually send the - // consume message to the server via the client library using its own - // logic and policy. - public boolean isAutoSendConsumeMessageEnabled() { - return conf.getBoolean(AUTO_SEND_CONSUME_MESSAGE_ENABLED, true); - } - - // This parameter is to set how many consumed messages we'll buffer up - // before we send the Consume message to the server indicating that all - // of the messages up to that point have been successfully consumed by - // the client. - public int getConsumedMessagesBufferSize() { - return conf.getInt(CONSUMED_MESSAGES_BUFFER_SIZE, 5); - } - - // This parameter is used to determine how long we wait before retrying the - // client app's MessageHandler to consume a subscribed messages sent to us - // from the server. The time to wait is in milliseconds. - public long getMessageConsumeRetryWaitTime() { - return conf.getLong(MESSAGE_CONSUME_RETRY_WAIT_TIME, 10000); - } - - // This parameter is used to determine how long we wait before retrying the - // Subscribe Reconnect request. This is done when the connection to a server - // disconnects and we attempt to connect to it. We'll keep on trying but - // in case the server(s) is down for a longer time, we want to throttle - // how often we do the subscribe reconnect request. The time to wait is in - // milliseconds. - public long getSubscribeReconnectRetryWaitTime() { - return conf.getLong(SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME, 10000); - } - - // This parameter is for setting the maximum number of outstanding messages - // the client app can be consuming at a time for topic subscription before - // we throttle things and stop reading from the Netty Channel. - public int getMaximumOutstandingMessages() { - return conf.getInt(MAX_OUTSTANDING_MESSAGES, 10); - } - - // This parameter is used to determine how long we wait (in milliseconds) - // before we time out outstanding PubSubRequests that were written to the - // server successfully but haven't yet received the ack response. - public long getServerAckResponseTimeout() { - return conf.getLong(SERVER_ACK_RESPONSE_TIMEOUT, 30000); - } - - // This parameter is used to determine how often we run the server ack - // response timeout cleaner thread (in milliseconds). - public long getTimeoutThreadRunInterval() { - return conf.getLong(TIMEOUT_THREAD_RUN_INTERVAL, 60000); - } - - // This parameter is a boolean flag indicating if communication with the - // server should be done via SSL for encryption. This is needed for - // cross-colo hub clients listening to non-local servers. - public boolean isSSLEnabled() { - return conf.getBoolean(SSL_ENABLED, false); - } - - /** - * This parameter is a boolean flag indicating if multiplexing subscription - * channels. - */ - public boolean isSubscriptionChannelSharingEnabled() { - return conf.getBoolean(SUBSCRIPTION_CHANNEL_SHARING_ENABLED, false); - } - - /** - * The maximum number of messages the hub will queue for subscriptions - * created using this configuration. The hub will always queue the most - * recent messages. If there are enough publishes to the topic to hit - * the bound, then the oldest messages are dropped from the queue. - * - * A bound of 0 disabled the bound completely. This is the default. - */ - public int getSubscriptionMessageBound() { - return conf.getInt(SUBSCRIPTION_MESSAGE_BOUND, 0); - } - - // Validate that the configuration properties are valid. - public void validate() throws ConfigurationException { - if (isSSLEnabled() && getDefaultServerHedwigSocketAddress().getSSLSocketAddress() == null) { - throw new ConfigurationException("SSL is enabled but a default server SSL port not given!"); - } - // Add other validation checks here - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java b/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java deleted file mode 100644 index 346d74b..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.data; - -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.protocol.PubSubProtocol.Message; - -/** - * Wrapper class to store all of the data points needed to encapsulate Message - * Consumption in the Subscribe flow for consuming a message sent from the - * server for a given TopicSubscriber. This will be used as the Context in the - * VoidCallback for the MessageHandlers once they've completed consuming the - * message. - * - */ -public class MessageConsumeData { - - // Member variables - public final TopicSubscriber topicSubscriber; - // This is the Message sent from the server for Subscribes for consumption - // by the client. - public final Message msg; - - // Constructor - public MessageConsumeData(final TopicSubscriber topicSubscriber, final Message msg) { - this.topicSubscriber = topicSubscriber; - this.msg = msg; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - if (topicSubscriber != null) { - sb.append("Subscription: ").append(topicSubscriber); - } - if (msg != null) { - sb.append(PubSubData.COMMA).append("Message: ").append(msg); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java b/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java deleted file mode 100644 index 63547a0..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.data; - -import java.util.List; - -import com.google.protobuf.ByteString; -import org.apache.hedwig.client.netty.HChannel; -import org.apache.hedwig.protocol.PubSubProtocol; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.util.Callback; - -/** - * Wrapper class to store all of the data points needed to encapsulate all - * PubSub type of request operations the client will do. This includes knowing - * all of the information needed if we need to redo the publish/subscribe - * request in case of a server redirect. This will be used for all sync/async - * calls, and for all the known types of request messages to send to the server - * hubs: Publish, Subscribe, Unsubscribe, and Consume. - * - */ -public class PubSubData { - // Static string constants - protected static final String COMMA = ", "; - - // Member variables needed during object construction time. - public final ByteString topic; - public final Message msg; - public final ByteString subscriberId; - // Enum to indicate what type of operation this PubSub request data object - // is for. - public final OperationType operationType; - // Options for the subscription - public final SubscriptionOptions options; - - // These two variables are not final since we might override them - // in the case of a Subscribe reconnect. - private Callback<PubSubProtocol.ResponseBody> callback; - - public Object context; - - // Member variables used after object has been constructed. - // List of all servers we've sent the PubSubRequest to successfully. - // This is to keep track of redirected servers that responded back to us. - public List<ByteString> triedServers; - // List of all servers that we've tried to connect or write to but - // was unsuccessful. We'll retry sending the PubSubRequest but will - // quit if we're trying to connect or write to a server that we've - // attempted to previously. - public List<ByteString> connectFailedServers; - public List<ByteString> writeFailedServers; - // Boolean to the hub server indicating if it should claim ownership - // of the topic the PubSubRequest is for. This is mainly used after - // a server redirect. Defaults to false. - public boolean shouldClaim = false; - // TxnID for the PubSubData if it was sent as a PubSubRequest to the hub - // server. This is used in the WriteCallback in case of failure. We want - // to remove it from the ResponseHandler.txn2PubSubData map since the - // failed PubSubRequest will not get an ack response from the server. - // This is set later in the PubSub flows only when we write the actual - // request. Therefore it is not an argument in the constructor. - public long txnId; - // Time in milliseconds using the System.currentTimeMillis() call when the - // PubSubRequest was written on the netty Channel to the server. - public long requestWriteTime; - // For synchronous calls, this variable is used to know when the background - // async process for it has completed, set in the VoidCallback. - public boolean isDone = false; - // Record the original channel for a resubscribe request - private HChannel origChannel = null; - - // Constructor for all types of PubSub request data to send to the server - public PubSubData(final ByteString topic, final Message msg, final ByteString subscriberId, - final OperationType operationType, final SubscriptionOptions options, - final Callback<PubSubProtocol.ResponseBody> callback, - final Object context) { - this.topic = topic; - this.msg = msg; - this.subscriberId = subscriberId; - this.operationType = operationType; - this.options = options; - this.callback = callback; - this.context = context; - } - - public void setCallback(Callback<PubSubProtocol.ResponseBody> callback) { - this.callback = callback; - } - - public Callback<PubSubProtocol.ResponseBody> getCallback() { - return callback; - } - - public void operationFinishedToCallback(Object context, PubSubProtocol.ResponseBody response){ - callback.operationFinished(context, response); - } - - public boolean isResubscribeRequest() { - return null != origChannel; - } - - public HChannel getOriginalChannelForResubscribe() { - return origChannel; - } - - public void setOriginalChannelForResubscribe(HChannel channel) { - this.origChannel = channel; - } - - // Clear all of the stored servers we've contacted or attempted to in this - // request. - public void clearServersList() { - if (triedServers != null) - triedServers.clear(); - if (connectFailedServers != null) - connectFailedServers.clear(); - if (writeFailedServers != null) - writeFailedServers.clear(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - if (topic != null) - sb.append("Topic: " + topic.toStringUtf8()); - if (msg != null) - sb.append(COMMA).append("Message: " + msg); - if (subscriberId != null) - sb.append(COMMA).append("SubscriberId: " + subscriberId.toStringUtf8()); - if (operationType != null) - sb.append(COMMA).append("Operation Type: " + operationType.toString()); - if (options != null) - sb.append(COMMA).append("Create Or Attach: " + options.getCreateOrAttach().toString()) - .append(COMMA).append("Message Bound: " + options.getMessageBound()); - if (triedServers != null && triedServers.size() > 0) { - sb.append(COMMA).append("Tried Servers: "); - for (ByteString triedServer : triedServers) { - sb.append(triedServer.toStringUtf8()).append(COMMA); - } - } - if (connectFailedServers != null && connectFailedServers.size() > 0) { - sb.append(COMMA).append("Connect Failed Servers: "); - for (ByteString connectFailedServer : connectFailedServers) { - sb.append(connectFailedServer.toStringUtf8()).append(COMMA); - } - } - if (writeFailedServers != null && writeFailedServers.size() > 0) { - sb.append(COMMA).append("Write Failed Servers: "); - for (ByteString writeFailedServer : writeFailedServers) { - sb.append(writeFailedServer.toStringUtf8()).append(COMMA); - } - } - sb.append(COMMA).append("Should Claim: " + shouldClaim); - if (txnId != 0) - sb.append(COMMA).append("TxnID: " + txnId); - if (requestWriteTime != 0) - sb.append(COMMA).append("Request Write Time: " + requestWriteTime); - sb.append(COMMA).append("Is Done: " + isDone); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java deleted file mode 100644 index 064cec1..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.data; - -import org.apache.commons.lang.builder.HashCodeBuilder; - -import com.google.protobuf.ByteString; - -/** - * Wrapper class object for the Topic + SubscriberId combination. Since the - * Subscribe flows always use the Topic + SubscriberId as the logical entity, - * we'll create a simple class to encapsulate that. - * - */ -public class TopicSubscriber { - private final ByteString topic; - private final ByteString subscriberId; - private final int hashCode; - - public TopicSubscriber(final ByteString topic, final ByteString subscriberId) { - this.topic = topic; - this.subscriberId = subscriberId; - hashCode = new HashCodeBuilder().append(topic).append(subscriberId).toHashCode(); - } - - @Override - public boolean equals(final Object o) { - if (o == this) - return true; - if (!(o instanceof TopicSubscriber)) - return false; - final TopicSubscriber obj = (TopicSubscriber) o; - return topic.equals(obj.topic) && subscriberId.equals(obj.subscriberId); - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - if (topic != null) - sb.append("Topic: " + topic.toStringUtf8()); - if (subscriberId != null) - sb.append(PubSubData.COMMA).append("SubscriberId: " + subscriberId.toStringUtf8()); - return sb.toString(); - } - - public ByteString getTopic() { - return topic; - } - - public ByteString getSubscriberId() { - return subscriberId; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java deleted file mode 100644 index 5f468e6..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception when the local client wants to - * startDelivery using another message handler before stopping previous one. - */ -public class AlreadyStartDeliveryException extends Exception { - - private static final long serialVersionUID = 873259807218723524L; - - public AlreadyStartDeliveryException(String message) { - super(message); - } - - public AlreadyStartDeliveryException(String message, Throwable t) { - super(message, t); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java deleted file mode 100644 index 3e54356..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception when the local client wants to do - * subscribe type of operations. Currently, to distinguish between local and hub - * subscribers, the subscriberId will have a specific format. - */ -public class InvalidSubscriberIdException extends Exception { - - private static final long serialVersionUID = 873259807218723523L; - - public InvalidSubscriberIdException(String message) { - super(message); - } - - public InvalidSubscriberIdException(String message, Throwable t) { - super(message, t); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java deleted file mode 100644 index 22b44b1..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception thrown when it can't get the response - * handler from the channel pipeline responsible for a PubSubRequest. - */ -public class NoResponseHandlerException extends Exception { - private static final long serialVersionUID = 1L; - - public NoResponseHandlerException(String message) { - super(message); - } - - public NoResponseHandlerException(String message, Throwable t) { - super(message, t); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java deleted file mode 100644 index c9aeb38..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception when the client failed to resubscribe - * when topic moved or subscription is closed. - */ -public class ResubscribeException extends Exception { - - public ResubscribeException(String message) { - super(message); - } - - public ResubscribeException(String message, Throwable t) { - super(message, t); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java deleted file mode 100644 index da6d4e7..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception when the PubSubRequest is being - * redirected to a server where the request has already been sent to previously. - * To avoid having a cyclical redirect loop, this condition is checked for - * and this exception will be thrown to the client caller. - */ -public class ServerRedirectLoopException extends Exception { - - private static final long serialVersionUID = 98723508723152897L; - - public ServerRedirectLoopException(String message) { - super(message); - } - - public ServerRedirectLoopException(String message, Throwable t) { - super(message, t); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java deleted file mode 100644 index 4a3c99f..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.exceptions; - -/** - * This is a Hedwig client side exception when there have been too many server - * redirects during a publish/subscribe call. We only allow a certain number of - * server redirects to find the topic master. If we have exceeded this - * configured amount, the publish/subscribe will fail with this exception. - * - */ -public class TooManyServerRedirectsException extends Exception { - - private static final long serialVersionUID = 2341192937965635310L; - - public TooManyServerRedirectsException(String message) { - super(message); - } - - public TooManyServerRedirectsException(String message, Throwable t) { - super(message, t); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java deleted file mode 100644 index b8a6787..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import java.net.InetSocketAddress; -import java.util.LinkedList; - -import com.google.protobuf.ByteString; - -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.client.exceptions.ServerRedirectLoopException; -import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException; -import org.apache.hedwig.client.netty.NetUtils; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.util.HedwigSocketAddress; -import static org.apache.hedwig.util.VarArgs.va; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractResponseHandler { - - private final static Logger logger = LoggerFactory.getLogger(AbstractResponseHandler.class); - - protected final ClientConfiguration cfg; - protected final HChannelManager channelManager; - - protected AbstractResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - this.cfg = cfg; - this.channelManager = channelManager; - } - - /** - * Logic to handle received response. - * - * @param response - * PubSubResponse received from hub server. - * @param pubSubData - * PubSubData for the pub/sub request. - * @param channel - * Channel we used to make the request. - */ - public abstract void handleResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) throws Exception; - - /** - * Logic to repost a PubSubRequest when the server responds with a redirect - * indicating they are not the topic master. - * - * @param response - * PubSubResponse from the server for the redirect - * @param pubSubData - * PubSubData for the original PubSubRequest made - * @param channel - * Channel Channel we used to make the original PubSubRequest - * @throws Exception - * Throws an exception if there was an error in doing the - * redirect repost of the PubSubRequest - */ - protected void handleRedirectResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) - throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Handling a redirect from host: {}, response: {}, pubSubData: {}", - va(NetUtils.getHostFromChannel(channel), response, pubSubData)); - } - // In this case, the PubSub request was done to a server that is not - // responsible for the topic. First make sure that we haven't - // exceeded the maximum number of server redirects. - int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : pubSubData.triedServers.size(); - if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) { - // We've already exceeded the maximum number of server redirects - // so consider this as an error condition for the client. - // Invoke the operationFailed callback and just return. - logger.debug("Exceeded the number of server redirects ({}) so error out.", - curNumServerRedirects); - PubSubException exception = new ServiceDownException( - new TooManyServerRedirectsException("Already reached max number of redirects: " - + curNumServerRedirects)); - pubSubData.getCallback().operationFailed(pubSubData.context, exception); - return; - } - - // We will redirect and try to connect to the correct server - // stored in the StatusMsg of the response. First store the - // server that we sent the PubSub request to for the topic. - ByteString triedServer = ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr( - NetUtils.getHostFromChannel(channel))); - if (pubSubData.triedServers == null) { - pubSubData.triedServers = new LinkedList<ByteString>(); - } - pubSubData.shouldClaim = true; - pubSubData.triedServers.add(triedServer); - - // Now get the redirected server host (expected format is - // Hostname:Port:SSLPort) from the server's response message. If one is - // not given for some reason, then redirect to the default server - // host/VIP to repost the request. - String statusMsg = response.getStatusMsg(); - InetSocketAddress redirectedHost; - boolean redirectToDefaultServer; - if (statusMsg != null && statusMsg.length() > 0) { - if (cfg.isSSLEnabled()) { - redirectedHost = new HedwigSocketAddress(statusMsg).getSSLSocketAddress(); - } else { - redirectedHost = new HedwigSocketAddress(statusMsg).getSocketAddress(); - } - redirectToDefaultServer = false; - } else { - redirectedHost = cfg.getDefaultServerHost(); - redirectToDefaultServer = true; - } - - // Make sure the redirected server is not one we've already attempted - // already before in this PubSub request. - if (pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost)))) { - logger.error("We've already sent this PubSubRequest before to redirectedHost: {}, pubSubData: {}", - va(redirectedHost, pubSubData)); - PubSubException exception = new ServiceDownException( - new ServerRedirectLoopException("Already made the request before to redirected host: " - + redirectedHost)); - pubSubData.getCallback().operationFailed(pubSubData.context, exception); - return; - } - - // submit the pub/sub request to redirected host - if (redirectToDefaultServer) { - channelManager.submitOpToDefaultServer(pubSubData); - } else { - channelManager.redirectToHost(pubSubData, redirectedHost); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java deleted file mode 100644 index c5b58a0..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; - -public class CloseSubscriptionResponseHandler extends AbstractResponseHandler { - - private final static Logger logger = LoggerFactory.getLogger(CloseSubscriptionResponseHandler.class); - - public CloseSubscriptionResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - public void handleResponse(final PubSubResponse response, final PubSubData pubSubData, - final Channel channel) - throws Exception { - switch (response.getStatusCode()) { - case SUCCESS: - pubSubData.getCallback().operationFinished(pubSubData.context, null); - break; - case CLIENT_NOT_SUBSCRIBED: - // For closesubscription requests, the server says that the client was - // never subscribed to the topic. - pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException( - "Client was never subscribed to topic: " + - pubSubData.topic.toStringUtf8() + ", subscriberId: " + - pubSubData.subscriberId.toStringUtf8())); - break; - case SERVICE_DOWN: - // Response was service down failure so just invoke the callback's - // operationFailed method. - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a SERVICE_DOWN status")); - break; - case NOT_RESPONSIBLE_FOR_TOPIC: - // Redirect response so we'll need to repost the original - // Unsubscribe Request - handleRedirectResponse(response, pubSubData, channel); - break; - default: - // Consider all other status codes as errors, operation failed - // cases. - logger.error("Unexpected error response from server for PubSubResponse: " + response); - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a status code of: " + - response.getStatusCode())); - break; - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java deleted file mode 100644 index 32517b0..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import java.util.TimerTask; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.MessageConsumeData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.protoextensions.MessageIdUtils; -import org.apache.hedwig.util.Callback; -import static org.apache.hedwig.util.VarArgs.va; - -/** - * This is the Callback used by the MessageHandlers on the client app when - * they've finished consuming a subscription message sent from the server - * asynchronously. This callback back to the client libs will be stateless so we - * can use a singleton for the class. The object context used should be the - * MessageConsumeData type. That will contain all of the information needed to - * call the message consume logic in the client lib HChannelHandler. - * - */ -public class MessageConsumeCallback implements Callback<Void> { - - private static final Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class); - - private final HChannelManager channelManager; - private final long consumeRetryWaitTime; - - public MessageConsumeCallback(ClientConfiguration cfg, - HChannelManager channelManager) { - this.channelManager = channelManager; - this.consumeRetryWaitTime = - cfg.getMessageConsumeRetryWaitTime(); - } - - class MessageConsumeRetryTask extends TimerTask { - private final MessageConsumeData messageConsumeData; - - public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) { - this.messageConsumeData = messageConsumeData; - } - - @Override - public void run() { - // Try to consume the message again - SubscribeResponseHandler subscribeHChannelHandler = - channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber); - if (null == subscribeHChannelHandler || - !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) { - logger.warn("No subscription {} found to retry delivering message {}.", - va(messageConsumeData.topicSubscriber, - MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId()))); - return; - } - - subscribeHChannelHandler.asyncMessageDeliver(messageConsumeData.topicSubscriber, - messageConsumeData.msg); - } - } - - public void operationFinished(Object ctx, Void resultOfOperation) { - MessageConsumeData messageConsumeData = (MessageConsumeData) ctx; - - SubscribeResponseHandler subscribeHChannelHandler = - channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber); - if (null == subscribeHChannelHandler || - !subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) { - logger.warn("No subscription {} found to consume message {}.", - va(messageConsumeData.topicSubscriber, - MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId()))); - return; - } - - // Message has been successfully consumed by the client app so callback - // to the HChannelHandler indicating that the message is consumed. - subscribeHChannelHandler.messageConsumed(messageConsumeData.topicSubscriber, - messageConsumeData.msg); - } - - public void operationFailed(Object ctx, PubSubException exception) { - // Message has NOT been successfully consumed by the client app so - // callback to the HChannelHandler to try the async MessageHandler - // Consume logic again. - MessageConsumeData messageConsumeData = (MessageConsumeData) ctx; - logger.error("Message was not consumed successfully by client MessageHandler: {}", - messageConsumeData); - - // Sleep a pre-configured amount of time (in milliseconds) before we - // do the retry. In the future, we can have more dynamic logic on - // what duration to sleep based on how many times we've retried, or - // perhaps what the last amount of time we slept was. We could stick - // some of this meta-data into the MessageConsumeData when we retry. - channelManager.schedule(new MessageConsumeRetryTask(messageConsumeData), - consumeRetryWaitTime); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java deleted file mode 100644 index 3f5fbcd..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import org.apache.hedwig.protocol.PubSubProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.Callback; - -/** - * This class is used when we are doing synchronous type of operations. All - * underlying client ops in Hedwig are async so this is just a way to make the - * async calls synchronous. - * - */ -public class PubSubCallback implements Callback<PubSubProtocol.ResponseBody> { - - private static final Logger logger = LoggerFactory.getLogger(PubSubCallback.class); - - // Private member variables - private final PubSubData pubSubData; - // Boolean indicator to see if the sync PubSub call was successful or not. - private boolean isCallSuccessful; - // For sync callbacks, we'd like to know what the PubSubException is thrown - // on failure. This is so we can have a handle to the exception and rethrow - // it later. - private PubSubException failureException; - - private PubSubProtocol.ResponseBody responseBody; - - // Constructor - public PubSubCallback(PubSubData pubSubData) { - this.pubSubData = pubSubData; - } - - public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) { - logger.debug("PubSub call succeeded for pubSubData: {}", pubSubData); - // Wake up the main sync PubSub thread that is waiting for us to - // complete. - synchronized (pubSubData) { - this.responseBody = resultOfOperation; - isCallSuccessful = true; - pubSubData.isDone = true; - pubSubData.notify(); - } - } - - public void operationFailed(Object ctx, PubSubException exception) { - logger.debug("PubSub call failed with exception: {}, pubSubData: {}", exception, pubSubData); - // Wake up the main sync PubSub thread that is waiting for us to - // complete. - synchronized (pubSubData) { - isCallSuccessful = false; - failureException = exception; - pubSubData.isDone = true; - pubSubData.notify(); - } - } - - // Public getter to determine if the PubSub callback is successful or not - // based on the PubSub ack response from the server. - public boolean getIsCallSuccessful() { - return isCallSuccessful; - } - - // Public getter to retrieve what the PubSubException was that occurred when - // the operation failed. - public PubSubException getFailureException() { - return failureException; - } - - - public PubSubProtocol.ResponseBody getResponseBody() { - return responseBody; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java deleted file mode 100644 index 8b2e9d1..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; - -public class PublishResponseHandler extends AbstractResponseHandler { - - private static final Logger logger = LoggerFactory.getLogger(PublishResponseHandler.class); - - public PublishResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - public void handleResponse(PubSubResponse response, PubSubData pubSubData, - Channel channel) throws Exception { - switch (response.getStatusCode()) { - case SUCCESS: - // Response was success so invoke the callback's operationFinished - // method. - pubSubData.operationFinishedToCallback(pubSubData.context, - response.hasResponseBody() ? response.getResponseBody() : null); - break; - case SERVICE_DOWN: - // Response was service down failure so just invoke the callback's - // operationFailed method. - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a SERVICE_DOWN status")); - break; - case NOT_RESPONSIBLE_FOR_TOPIC: - // Redirect response so we'll need to repost the original Publish - // Request - handleRedirectResponse(response, pubSubData, channel); - break; - default: - // Consider all other status codes as errors, operation failed - // cases. - logger.error("Unexpected error response from server for PubSubResponse: " + response); - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a status code of: " + - response.getStatusCode())); - break; - } - } -}