http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
deleted file mode 100644
index 38dfc3c..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.benchmark;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
-import org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputAggregator;
-import 
org.apache.hedwig.client.benchmark.BenchmarkUtils.ThroughputLatencyAggregator;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-import 
org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.util.Callback;
-
-public class BenchmarkSubscriber extends BenchmarkWorker implements 
Callable<Void> {
-    private static final Logger logger = 
LoggerFactory.getLogger(BenchmarkSubscriber.class);
-    Subscriber subscriber;
-    ByteString subId;
-
-
-    public BenchmarkSubscriber(int numTopics, int numMessages, int numRegions,
-                               int startTopicLabel, int partitionIndex, int 
numPartitions, Subscriber subscriber, ByteString subId) {
-        super(numTopics, numMessages, numRegions, startTopicLabel, 
partitionIndex, numPartitions);
-        this.subscriber = subscriber;
-        this.subId = subId;
-    }
-
-    public void warmup(int numWarmup) throws InterruptedException {
-        /*
-         * multiplying the number of ops by numParitions because we end up
-         * skipping many because of the partitioning logic
-         */
-        multiSub("warmup", "warmup", 0, numWarmup, numWarmup * numPartitions);
-    }
-
-    public Void call() throws Exception {
-
-        final ThroughputAggregator agg = new ThroughputAggregator("recvs", 
numMessages);
-        agg.startProgress();
-
-        final Map<String, Long> lastSeqIdSeenMap = new HashMap<String, Long>();
-
-        for (int i = startTopicLabel; i < startTopicLabel + numTopics; i++) {
-
-            if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, 
numPartitions)) {
-                continue;
-            }
-
-            final String topic = HedwigBenchmark.TOPIC_PREFIX + i;
-
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscriber.subscribe(ByteString.copyFromUtf8(topic), subId, opts);
-            subscriber.startDelivery(ByteString.copyFromUtf8(topic), subId, 
new MessageHandler() {
-
-                @Override
-                public void deliver(ByteString thisTopic, ByteString 
subscriberId, Message msg,
-                Callback<Void> callback, Object context) {
-                    logger.debug("Got message from src-region: {} with seq-id: 
{}",
-                        msg.getSrcRegion(), msg.getMsgId());
-
-                    String mapKey = topic + msg.getSrcRegion().toStringUtf8();
-                    Long lastSeqIdSeen = lastSeqIdSeenMap.get(mapKey);
-                    if (lastSeqIdSeen == null) {
-                        lastSeqIdSeen = (long) 0;
-                    }
-
-                    if (getSrcSeqId(msg) <= lastSeqIdSeen) {
-                        logger.info("Redelivery of message, src-region: " + 
msg.getSrcRegion() + "seq-id: "
-                                    + msg.getMsgId());
-                    } else {
-                        agg.ding(false);
-                    }
-
-                    callback.operationFinished(context, null);
-                }
-            });
-        }
-        System.out.println("Finished subscribing to topics and now waiting for 
messages to come in...");
-        // Wait till the benchmark test has completed
-        agg.queue.take();
-        System.out.println(agg.summarize(agg.earliest.get()));
-        return null;
-    }
-
-    long getSrcSeqId(Message msg) {
-        if (msg.getMsgId().getRemoteComponentsCount() == 0) {
-            return msg.getMsgId().getLocalComponent();
-        }
-
-        for (RegionSpecificSeqId rseqId : 
msg.getMsgId().getRemoteComponentsList()) {
-            if (rseqId.getRegion().equals(msg.getSrcRegion()))
-                return rseqId.getSeqId();
-        }
-
-        return msg.getMsgId().getLocalComponent();
-    }
-
-    void multiSub(String label, String topicPrefix, int start, final int npar, 
final int count)
-            throws InterruptedException {
-        long startTime = MathUtils.now();
-        ThroughputLatencyAggregator agg = new 
ThroughputLatencyAggregator(label, count / numPartitions, npar);
-        agg.startProgress();
-
-        int end = start + count;
-        for (int i = start; i < end; ++i) {
-            if (!HedwigBenchmark.amIResponsibleForTopic(i, partitionIndex, 
numPartitions)) {
-                continue;
-            }
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            subscriber.asyncSubscribe(ByteString.copyFromUtf8(topicPrefix + i),
-                                      subId, opts,
-                                      new BenchmarkCallback(agg), null);
-        }
-        // Wait till the benchmark test has completed
-        agg.tpAgg.queue.take();
-        if (count > 1)
-            System.out.println(agg.summarize(startTime));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
deleted file mode 100644
index 3efe22d..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.benchmark;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-public class BenchmarkUtils {
-    static final Logger logger = LoggerFactory.getLogger(BenchmarkUtils.class);
-
-    public static double calcTp(final int count, long startTime) {
-        return 1000. * count / (MathUtils.now() - startTime);
-    }
-
-    /**
-     * Stats aggregator for callback (round-trip) operations. Measures both
-     * throughput and latency.
-     */
-    public static class ThroughputLatencyAggregator {
-        int numBuckets;
-        final ThroughputAggregator tpAgg;
-        final Semaphore outstanding;
-        final AtomicLong sum = new AtomicLong();
-
-        final AtomicLong[] latencyBuckets;
-
-        // bucket[i] is count of number of operations that took >= i ms and <
-        // (i+1) ms.
-
-        public ThroughputLatencyAggregator(String label, int count, int limit) 
throws InterruptedException {
-            numBuckets = Integer.getInteger("numBuckets", 101);
-            latencyBuckets = new AtomicLong[numBuckets];
-            tpAgg = new ThroughputAggregator(label, count);
-            outstanding = new Semaphore(limit);
-            for (int i = 0; i < numBuckets; i++) {
-                latencyBuckets[i] = new AtomicLong();
-            }
-        }
-
-        public void startProgress() {
-            tpAgg.startProgress();
-        }
-
-        public void reportLatency(long latency) {
-            sum.addAndGet(latency);
-
-            int bucketIndex;
-            if (latency >= numBuckets) {
-                bucketIndex = (int) numBuckets - 1;
-            } else {
-                bucketIndex = (int) latency;
-            }
-            latencyBuckets[bucketIndex].incrementAndGet();
-        }
-
-        private String getPercentile(double percentile) {
-            int numInliersNeeded = (int) (percentile / 100 * tpAgg.count);
-            int numInliersFound = 0;
-            for (int i = 0; i < numBuckets - 1; i++) {
-                numInliersFound += latencyBuckets[i].intValue();
-                if (numInliersFound > numInliersNeeded) {
-                    return i + "";
-                }
-            }
-            return " >= " + (numBuckets - 1);
-        }
-
-        public String summarize(long startTime) {
-            double percentile = 
Double.parseDouble(System.getProperty("percentile", "99.9"));
-            return tpAgg.summarize(startTime) + ", avg latency = " + sum.get() 
/ tpAgg.count + ", " + percentile
-                   + "%ile latency = " + getPercentile(percentile);
-        }
-    }
-
-    /**
-     * Stats aggregator for non-callback (single-shot) operations. Measures 
just
-     * throughput.
-     */
-    public static class ThroughputAggregator {
-        final String label;
-        final int count;
-        final AtomicInteger done = new AtomicInteger();
-        final AtomicLong earliest = new AtomicLong();
-        final AtomicInteger numFailed = new AtomicInteger();
-        final Thread progressThread;
-        final LinkedBlockingQueue<Integer> queue = new 
LinkedBlockingQueue<Integer>();
-
-        public ThroughputAggregator(final String label, final int count) {
-            this.label = label;
-            this.count = count;
-            if (count == 0)
-                queue.add(0);
-            if (Boolean.getBoolean("progress")) {
-                progressThread = new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            for (int doneSnap = 0, prev = 0; doneSnap < count; 
prev = doneSnap, doneSnap = done.get()) {
-                                if (doneSnap > prev) {
-                                    System.out.println(label + " progress: " + 
doneSnap + " of " + count);
-                                }
-                                Thread.sleep(1000);
-                            }
-                        } catch (Exception ex) {
-                            throw new RuntimeException(ex);
-                        }
-                    }
-                    });
-            } else {
-                progressThread = null;
-            }
-        }
-
-        public void startProgress() {
-            if (progressThread != null) {
-                progressThread.start();
-            }
-        }
-
-        public void ding(boolean failed) {
-            int snapDone = done.incrementAndGet();
-            earliest.compareAndSet(0, MathUtils.now());
-            if (failed)
-                numFailed.incrementAndGet();
-            if (logger.isDebugEnabled())
-                logger.debug(label + " " + (failed ? "failed" : "succeeded") + 
", done so far = " + snapDone);
-            if (snapDone == count) {
-                queue.add(numFailed.get());
-            }
-        }
-
-        public String summarize(long startTime) {
-            return "Finished " + label + ": count = " + done.get() + ", tput = 
" + calcTp(count, startTime)
-                   + " ops/s, numFailed = " + numFailed;
-        }
-    }
-
-    public static class BenchmarkCallback implements Callback<Void> {
-
-        final ThroughputLatencyAggregator agg;
-        final long startTime;
-
-        public BenchmarkCallback(ThroughputLatencyAggregator agg) throws 
InterruptedException {
-            this.agg = agg;
-            agg.outstanding.acquire();
-            // Must set the start time *after* taking acquiring on outstanding.
-            startTime = MathUtils.now();
-        }
-
-        private void finish(boolean failed) {
-            agg.reportLatency(MathUtils.now() - startTime);
-            agg.tpAgg.ding(failed);
-            agg.outstanding.release();
-        }
-
-        @Override
-        public void operationFinished(Object ctx, Void resultOfOperation) {
-            finish(false);
-        }
-
-        @Override
-        public void operationFailed(Object ctx, PubSubException exception) {
-            finish(true);
-        }
-    };
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
deleted file mode 100644
index e7b15f2..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkWorker.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.benchmark;
-
-public class BenchmarkWorker {
-    int numTopics;
-    int numMessages;
-    int numRegions;
-    int startTopicLabel;
-    int partitionIndex;
-    int numPartitions;
-
-    public BenchmarkWorker(int numTopics, int numMessages, int numRegions,
-                           int startTopicLabel, int partitionIndex, int 
numPartitions) {
-        this.numTopics = numTopics;
-        this.numMessages = numMessages;
-        this.numRegions = numRegions;
-        this.startTopicLabel = startTopicLabel;
-        this.partitionIndex = partitionIndex;
-        this.numPartitions = numPartitions;
-
-        if (numMessages % (numTopics * numRegions) != 0) {
-            throw new RuntimeException("Number of messages not equally 
divisible among regions and topics");
-        }
-
-        if (numTopics % numPartitions != 0) {
-            throw new RuntimeException("Number of topics not equally divisible 
among partitions");
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
deleted file mode 100644
index cc5e937..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/HedwigBenchmark.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.benchmark;
-
-import java.io.File;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Log4JLoggerFactory;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.ParseException;
-
-public class HedwigBenchmark implements Callable<Void> {
-    protected static final Logger logger = 
LoggerFactory.getLogger(HedwigBenchmark.class);
-
-    static final String TOPIC_PREFIX = "topic";
-
-    private final HedwigClient client;
-    private final Publisher publisher;
-    private final Subscriber subscriber;
-    private final CommandLine cmd;
-
-    public HedwigBenchmark(ClientConfiguration cfg, CommandLine cmd) {
-        client = new HedwigClient(cfg);
-        publisher = client.getPublisher();
-        subscriber = client.getSubscriber();
-        this.cmd = cmd;
-    }
-
-    static boolean amIResponsibleForTopic(int topicNum, int partitionIndex, 
int numPartitions) {
-        return topicNum % numPartitions == partitionIndex;
-    }
-
-    @Override
-    public Void call() throws Exception {
-
-        //
-        // Parameters.
-        //
-
-        // What program to run: pub, sub (subscription benchmark), recv.
-        final String mode = cmd.getOptionValue("mode","");
-
-        // Number of requests to make (publishes or subscribes).
-        int numTopics = Integer.valueOf(cmd.getOptionValue("nTopics", "50"));
-        int numMessages = Integer.valueOf(cmd.getOptionValue("nMsgs", "1000"));
-        int numRegions = Integer.valueOf(cmd.getOptionValue("nRegions", "1"));
-        int startTopicLabel = 
Integer.valueOf(cmd.getOptionValue("startTopicLabel", "0"));
-        int partitionIndex = 
Integer.valueOf(cmd.getOptionValue("partitionIndex", "0"));
-        int numPartitions = Integer.valueOf(cmd.getOptionValue("nPartitions", 
"1"));
-
-        int replicaIndex = Integer.valueOf(cmd.getOptionValue("replicaIndex", 
"0"));
-
-        int rate = Integer.valueOf(cmd.getOptionValue("rate", "0"));
-        int nParallel = Integer.valueOf(cmd.getOptionValue("npar", "100"));
-        int msgSize = Integer.valueOf(cmd.getOptionValue("msgSize", "1024"));
-
-        // Number of warmup subscriptions to make.
-        final int nWarmups = Integer.valueOf(cmd.getOptionValue("nwarmups", 
"1000"));
-
-        if (mode.equals("sub")) {
-            BenchmarkSubscriber benchmarkSub = new 
BenchmarkSubscriber(numTopics, 0, 1, startTopicLabel, 0, 1,
-                    subscriber, ByteString.copyFromUtf8("mySub"));
-
-            benchmarkSub.warmup(nWarmups);
-            benchmarkSub.call();
-
-        } else if (mode.equals("recv")) {
-
-            BenchmarkSubscriber benchmarkSub = new 
BenchmarkSubscriber(numTopics, numMessages, numRegions,
-                    startTopicLabel, partitionIndex, numPartitions, 
subscriber, ByteString.copyFromUtf8("sub-"
-                            + replicaIndex));
-
-            benchmarkSub.call();
-
-        } else if (mode.equals("pub")) {
-            // Offered load in msgs/second.
-            BenchmarkPublisher benchmarkPub = new 
BenchmarkPublisher(numTopics, numMessages, numRegions,
-                    startTopicLabel, partitionIndex, numPartitions, publisher, 
subscriber, msgSize, nParallel, rate);
-            benchmarkPub.warmup(nWarmups);
-            benchmarkPub.call();
-
-        } else {
-            throw new Exception("unknown mode: " + mode);
-        }
-
-        return null;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Options options = new Options();
-        options.addOption("mode", true, "sub, recv, or pub");
-        options.addOption("nTopics", true, "Number of topics, default 50");
-        options.addOption("nMsgs", true, "Number of messages, default 1000");
-        options.addOption("nRegions", true, "Number of regsions, default 1");
-        options.addOption("startTopicLabel", true,
-                          "Prefix of topic labels. Must be numeric. Default 
0");
-        options.addOption("partitionIndex", true, "If partitioning, the 
partition index for this client");
-        options.addOption("nPartitions", true, "Number of partitions, default 
1");
-        options.addOption("replicaIndex", true, "default 0");
-        options.addOption("rate", true, "default 0");
-        options.addOption("npar", true, "default 100");
-        options.addOption("msgSize", true, "Size of messages, default 1024");
-        options.addOption("nwarmups", true, "Number of warmup messages, 
default 1000");
-        options.addOption("defaultHub", true, "Default hedwig hub to connect 
to, default localhost:4080");
-
-        CommandLineParser parser = new PosixParser();
-        final CommandLine cmd = parser.parse(options, args);
-
-        if (cmd.hasOption("help")) {
-            HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp("HedwigBenchmark <options>", options);
-            System.exit(-1);
-        }
-
-        ClientConfiguration cfg = new ClientConfiguration() {
-                public HedwigSocketAddress 
getDefaultServerHedwigSocketAddress() {
-                    return new 
HedwigSocketAddress(cmd.getOptionValue("defaultHub",
-                                                                      
"localhost:4080"));
-                }
-
-                public boolean isSSLEnabled() {
-                    return false;
-                }
-            };
-
-        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
-
-        HedwigBenchmark app = new HedwigBenchmark(cfg, cmd);
-        app.call();
-        System.exit(0);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
deleted file mode 100644
index 836a9d2..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/conf/ClientConfiguration.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.conf;
-
-import java.net.InetSocketAddress;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.conf.AbstractConfiguration;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class ClientConfiguration extends AbstractConfiguration {
-    private static final Logger logger = 
LoggerFactory.getLogger(ClientConfiguration.class);
-
-    // Protected member variables for configuration parameter names
-    protected static final String DEFAULT_SERVER_HOST = "default_server_host";
-    protected static final String MAX_MESSAGE_SIZE = "max_message_size";
-    protected static final String MAX_SERVER_REDIRECTS = 
"max_server_redirects";
-    protected static final String AUTO_SEND_CONSUME_MESSAGE_ENABLED = 
"auto_send_consume_message_enabled";
-    protected static final String CONSUMED_MESSAGES_BUFFER_SIZE = 
"consumed_messages_buffer_size";
-    protected static final String MESSAGE_CONSUME_RETRY_WAIT_TIME = 
"message_consume_retry_wait_time";
-    protected static final String SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME = 
"subscribe_reconnect_retry_wait_time";
-    protected static final String MAX_OUTSTANDING_MESSAGES = 
"max_outstanding_messages";
-    protected static final String SERVER_ACK_RESPONSE_TIMEOUT = 
"server_ack_response_timeout";
-    protected static final String TIMEOUT_THREAD_RUN_INTERVAL = 
"timeout_thread_run_interval";
-    protected static final String SSL_ENABLED = "ssl_enabled";
-    protected static final String SUBSCRIPTION_MESSAGE_BOUND = 
"subscription_message_bound";
-    protected static final String SUBSCRIPTION_CHANNEL_SHARING_ENABLED = 
"subscription_channel_sharing_enabled";
-
-    // Singletons we want to instantiate only once per ClientConfiguration
-    protected HedwigSocketAddress myDefaultServerAddress = null;
-
-    // Getters for the various Client Configuration parameters.
-    // This should point to the default server host, or the VIP fronting all of
-    // the server hubs. This will return the HedwigSocketAddress which
-    // encapsulates both the regular and SSL port connection to the server 
host.
-    protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
-        if (myDefaultServerAddress == null)
-            myDefaultServerAddress = new 
HedwigSocketAddress(conf.getString(DEFAULT_SERVER_HOST, "localhost:4080:9876"));
-        return myDefaultServerAddress;
-    }
-
-    // This will get the default server InetSocketAddress based on if SSL is
-    // enabled or not.
-    public InetSocketAddress getDefaultServerHost() {
-        if (isSSLEnabled())
-            return getDefaultServerHedwigSocketAddress().getSSLSocketAddress();
-        else
-            return getDefaultServerHedwigSocketAddress().getSocketAddress();
-    }
-
-    public int getMaximumMessageSize() {
-        return conf.getInt(MAX_MESSAGE_SIZE, 2 * 1024 * 1024);
-    }
-
-    // This parameter is for setting the maximum number of server redirects to
-    // allow before we consider it as an error condition. This is to stop
-    // infinite redirect loops in case there is a problem with the hub servers
-    // topic mastership.
-    public int getMaximumServerRedirects() {
-        return conf.getInt(MAX_SERVER_REDIRECTS, 2);
-    }
-
-    // This parameter is a boolean flag indicating if the client library should
-    // automatically send the consume message to the server based on the
-    // configured amount of messages consumed by the client app. The client app
-    // could choose to override this behavior and instead, manually send the
-    // consume message to the server via the client library using its own
-    // logic and policy.
-    public boolean isAutoSendConsumeMessageEnabled() {
-        return conf.getBoolean(AUTO_SEND_CONSUME_MESSAGE_ENABLED, true);
-    }
-
-    // This parameter is to set how many consumed messages we'll buffer up
-    // before we send the Consume message to the server indicating that all
-    // of the messages up to that point have been successfully consumed by
-    // the client.
-    public int getConsumedMessagesBufferSize() {
-        return conf.getInt(CONSUMED_MESSAGES_BUFFER_SIZE, 5);
-    }
-
-    // This parameter is used to determine how long we wait before retrying the
-    // client app's MessageHandler to consume a subscribed messages sent to us
-    // from the server. The time to wait is in milliseconds.
-    public long getMessageConsumeRetryWaitTime() {
-        return conf.getLong(MESSAGE_CONSUME_RETRY_WAIT_TIME, 10000);
-    }
-
-    // This parameter is used to determine how long we wait before retrying the
-    // Subscribe Reconnect request. This is done when the connection to a 
server
-    // disconnects and we attempt to connect to it. We'll keep on trying but
-    // in case the server(s) is down for a longer time, we want to throttle
-    // how often we do the subscribe reconnect request. The time to wait is in
-    // milliseconds.
-    public long getSubscribeReconnectRetryWaitTime() {
-        return conf.getLong(SUBSCRIBE_RECONNECT_RETRY_WAIT_TIME, 10000);
-    }
-
-    // This parameter is for setting the maximum number of outstanding messages
-    // the client app can be consuming at a time for topic subscription before
-    // we throttle things and stop reading from the Netty Channel.
-    public int getMaximumOutstandingMessages() {
-        return conf.getInt(MAX_OUTSTANDING_MESSAGES, 10);
-    }
-
-    // This parameter is used to determine how long we wait (in milliseconds)
-    // before we time out outstanding PubSubRequests that were written to the
-    // server successfully but haven't yet received the ack response.
-    public long getServerAckResponseTimeout() {
-        return conf.getLong(SERVER_ACK_RESPONSE_TIMEOUT, 30000);
-    }
-
-    // This parameter is used to determine how often we run the server ack
-    // response timeout cleaner thread (in milliseconds).
-    public long getTimeoutThreadRunInterval() {
-        return conf.getLong(TIMEOUT_THREAD_RUN_INTERVAL, 60000);
-    }
-
-    // This parameter is a boolean flag indicating if communication with the
-    // server should be done via SSL for encryption. This is needed for
-    // cross-colo hub clients listening to non-local servers.
-    public boolean isSSLEnabled() {
-        return conf.getBoolean(SSL_ENABLED, false);
-    }
-
-    /**
-     * This parameter is a boolean flag indicating if multiplexing subscription
-     * channels.
-     */
-    public boolean isSubscriptionChannelSharingEnabled() {
-        return conf.getBoolean(SUBSCRIPTION_CHANNEL_SHARING_ENABLED, false);
-    }
-
-    /**
-     * The maximum number of messages the hub will queue for subscriptions
-     * created using this configuration. The hub will always queue the most
-     * recent messages. If there are enough publishes to the topic to hit
-     * the bound, then the oldest messages are dropped from the queue.
-     *
-     * A bound of 0 disabled the bound completely. This is the default.
-     */
-    public int getSubscriptionMessageBound() {
-        return conf.getInt(SUBSCRIPTION_MESSAGE_BOUND, 0);
-    }
-
-    // Validate that the configuration properties are valid.
-    public void validate() throws ConfigurationException {
-        if (isSSLEnabled() && 
getDefaultServerHedwigSocketAddress().getSSLSocketAddress() == null) {
-            throw new ConfigurationException("SSL is enabled but a default 
server SSL port not given!");
-        }
-        // Add other validation checks here
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
deleted file mode 100644
index 346d74b..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/data/MessageConsumeData.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.data;
-
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-
-/**
- * Wrapper class to store all of the data points needed to encapsulate Message
- * Consumption in the Subscribe flow for consuming a message sent from the
- * server for a given TopicSubscriber. This will be used as the Context in the
- * VoidCallback for the MessageHandlers once they've completed consuming the
- * message.
- *
- */
-public class MessageConsumeData {
-
-    // Member variables
-    public final TopicSubscriber topicSubscriber;
-    // This is the Message sent from the server for Subscribes for consumption
-    // by the client.
-    public final Message msg;
-
-    // Constructor
-    public MessageConsumeData(final TopicSubscriber topicSubscriber, final 
Message msg) {
-        this.topicSubscriber = topicSubscriber;
-        this.msg = msg;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        if (topicSubscriber != null) {
-            sb.append("Subscription: ").append(topicSubscriber);
-        }
-        if (msg != null) {
-            sb.append(PubSubData.COMMA).append("Message: ").append(msg);
-        }
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java 
b/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
deleted file mode 100644
index 63547a0..0000000
--- a/hedwig-client/src/main/java/org/apache/hedwig/client/data/PubSubData.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.data;
-
-import java.util.List;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.netty.HChannel;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.util.Callback;
-
-/**
- * Wrapper class to store all of the data points needed to encapsulate all
- * PubSub type of request operations the client will do. This includes knowing
- * all of the information needed if we need to redo the publish/subscribe
- * request in case of a server redirect. This will be used for all sync/async
- * calls, and for all the known types of request messages to send to the server
- * hubs: Publish, Subscribe, Unsubscribe, and Consume.
- *
- */
-public class PubSubData {
-    // Static string constants
-    protected static final String COMMA = ", ";
-
-    // Member variables needed during object construction time.
-    public final ByteString topic;
-    public final Message msg;
-    public final ByteString subscriberId;
-    // Enum to indicate what type of operation this PubSub request data object
-    // is for.
-    public final OperationType operationType;
-    // Options for the subscription
-    public final SubscriptionOptions options;
-
-    // These two variables are not final since we might override them
-    // in the case of a Subscribe reconnect.
-    private Callback<PubSubProtocol.ResponseBody> callback;
-
-    public Object context;
-
-    // Member variables used after object has been constructed.
-    // List of all servers we've sent the PubSubRequest to successfully.
-    // This is to keep track of redirected servers that responded back to us.
-    public List<ByteString> triedServers;
-    // List of all servers that we've tried to connect or write to but
-    // was unsuccessful. We'll retry sending the PubSubRequest but will
-    // quit if we're trying to connect or write to a server that we've
-    // attempted to previously.
-    public List<ByteString> connectFailedServers;
-    public List<ByteString> writeFailedServers;
-    // Boolean to the hub server indicating if it should claim ownership
-    // of the topic the PubSubRequest is for. This is mainly used after
-    // a server redirect. Defaults to false.
-    public boolean shouldClaim = false;
-    // TxnID for the PubSubData if it was sent as a PubSubRequest to the hub
-    // server. This is used in the WriteCallback in case of failure. We want
-    // to remove it from the ResponseHandler.txn2PubSubData map since the
-    // failed PubSubRequest will not get an ack response from the server.
-    // This is set later in the PubSub flows only when we write the actual
-    // request. Therefore it is not an argument in the constructor.
-    public long txnId;
-    // Time in milliseconds using the System.currentTimeMillis() call when the
-    // PubSubRequest was written on the netty Channel to the server.
-    public long requestWriteTime;
-    // For synchronous calls, this variable is used to know when the background
-    // async process for it has completed, set in the VoidCallback.
-    public boolean isDone = false;
-    // Record the original channel for a resubscribe request
-    private HChannel origChannel = null;
-
-    // Constructor for all types of PubSub request data to send to the server
-    public PubSubData(final ByteString topic, final Message msg, final 
ByteString subscriberId,
-                      final OperationType operationType, final 
SubscriptionOptions options,
-                      final Callback<PubSubProtocol.ResponseBody> callback,
-                      final Object context) {
-        this.topic = topic;
-        this.msg = msg;
-        this.subscriberId = subscriberId;
-        this.operationType = operationType;
-        this.options = options;
-        this.callback = callback;
-        this.context = context;
-    }
-
-    public void setCallback(Callback<PubSubProtocol.ResponseBody> callback) {
-        this.callback = callback;
-    }
-
-    public Callback<PubSubProtocol.ResponseBody> getCallback() {
-        return callback;
-    }
-
-    public void operationFinishedToCallback(Object context, 
PubSubProtocol.ResponseBody response){
-        callback.operationFinished(context, response);
-    }
-
-    public boolean isResubscribeRequest() {
-        return null != origChannel;
-    }
-
-    public HChannel getOriginalChannelForResubscribe() {
-        return origChannel;
-    }
-
-    public void setOriginalChannelForResubscribe(HChannel channel) {
-        this.origChannel = channel;
-    }
-
-    // Clear all of the stored servers we've contacted or attempted to in this
-    // request.
-    public void clearServersList() {
-        if (triedServers != null)
-            triedServers.clear();
-        if (connectFailedServers != null)
-            connectFailedServers.clear();
-        if (writeFailedServers != null)
-            writeFailedServers.clear();
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        if (topic != null)
-            sb.append("Topic: " + topic.toStringUtf8());
-        if (msg != null)
-            sb.append(COMMA).append("Message: " + msg);
-        if (subscriberId != null)
-            sb.append(COMMA).append("SubscriberId: " + 
subscriberId.toStringUtf8());
-        if (operationType != null)
-            sb.append(COMMA).append("Operation Type: " + 
operationType.toString());
-        if (options != null)
-            sb.append(COMMA).append("Create Or Attach: " + 
options.getCreateOrAttach().toString())
-                .append(COMMA).append("Message Bound: " + 
options.getMessageBound());
-        if (triedServers != null && triedServers.size() > 0) {
-            sb.append(COMMA).append("Tried Servers: ");
-            for (ByteString triedServer : triedServers) {
-                sb.append(triedServer.toStringUtf8()).append(COMMA);
-            }
-        }
-        if (connectFailedServers != null && connectFailedServers.size() > 0) {
-            sb.append(COMMA).append("Connect Failed Servers: ");
-            for (ByteString connectFailedServer : connectFailedServers) {
-                sb.append(connectFailedServer.toStringUtf8()).append(COMMA);
-            }
-        }
-        if (writeFailedServers != null && writeFailedServers.size() > 0) {
-            sb.append(COMMA).append("Write Failed Servers: ");
-            for (ByteString writeFailedServer : writeFailedServers) {
-                sb.append(writeFailedServer.toStringUtf8()).append(COMMA);
-            }
-        }
-        sb.append(COMMA).append("Should Claim: " + shouldClaim);
-        if (txnId != 0)
-            sb.append(COMMA).append("TxnID: " + txnId);
-        if (requestWriteTime != 0)
-            sb.append(COMMA).append("Request Write Time: " + requestWriteTime);
-        sb.append(COMMA).append("Is Done: " + isDone);
-        return sb.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
deleted file mode 100644
index 064cec1..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/data/TopicSubscriber.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.data;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Wrapper class object for the Topic + SubscriberId combination. Since the
- * Subscribe flows always use the Topic + SubscriberId as the logical entity,
- * we'll create a simple class to encapsulate that.
- *
- */
-public class TopicSubscriber {
-    private final ByteString topic;
-    private final ByteString subscriberId;
-    private final int hashCode;
-
-    public TopicSubscriber(final ByteString topic, final ByteString 
subscriberId) {
-        this.topic = topic;
-        this.subscriberId = subscriberId;
-        hashCode = new 
HashCodeBuilder().append(topic).append(subscriberId).toHashCode();
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (o == this)
-            return true;
-        if (!(o instanceof TopicSubscriber))
-            return false;
-        final TopicSubscriber obj = (TopicSubscriber) o;
-        return topic.equals(obj.topic) && 
subscriberId.equals(obj.subscriberId);
-    }
-
-    @Override
-    public int hashCode() {
-        return hashCode;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        if (topic != null)
-            sb.append("Topic: " + topic.toStringUtf8());
-        if (subscriberId != null)
-            sb.append(PubSubData.COMMA).append("SubscriberId: " + 
subscriberId.toStringUtf8());
-        return sb.toString();
-    }
-
-    public ByteString getTopic() {
-        return topic;
-    }
-
-    public ByteString getSubscriberId() {
-        return subscriberId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java
deleted file mode 100644
index 5f468e6..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/AlreadyStartDeliveryException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception when the local client wants to
- * startDelivery using another message handler before stopping previous one.
- */
-public class AlreadyStartDeliveryException extends Exception {
-
-    private static final long serialVersionUID = 873259807218723524L;
-
-    public AlreadyStartDeliveryException(String message) {
-        super(message);
-    }
-
-    public AlreadyStartDeliveryException(String message, Throwable t) {
-        super(message, t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
deleted file mode 100644
index 3e54356..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/InvalidSubscriberIdException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception when the local client wants to do
- * subscribe type of operations. Currently, to distinguish between local and 
hub
- * subscribers, the subscriberId will have a specific format.
- */
-public class InvalidSubscriberIdException extends Exception {
-
-    private static final long serialVersionUID = 873259807218723523L;
-
-    public InvalidSubscriberIdException(String message) {
-        super(message);
-    }
-
-    public InvalidSubscriberIdException(String message, Throwable t) {
-        super(message, t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java
deleted file mode 100644
index 22b44b1..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/NoResponseHandlerException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception thrown when it can't get the response
- * handler from the channel pipeline responsible for a PubSubRequest.
- */
-public class NoResponseHandlerException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public NoResponseHandlerException(String message) {
-        super(message);
-    }
-
-    public NoResponseHandlerException(String message, Throwable t) {
-        super(message, t);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
deleted file mode 100644
index c9aeb38..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ResubscribeException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception when the client failed to resubscribe
- * when topic moved or subscription is closed.
- */
-public class ResubscribeException extends Exception {
-
-    public ResubscribeException(String message) {
-        super(message);
-    }
-
-    public ResubscribeException(String message, Throwable t) {
-        super(message, t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
deleted file mode 100644
index da6d4e7..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/ServerRedirectLoopException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception when the PubSubRequest is being
- * redirected to a server where the request has already been sent to 
previously.
- * To avoid having a cyclical redirect loop, this condition is checked for
- * and this exception will be thrown to the client caller.
- */
-public class ServerRedirectLoopException extends Exception {
-
-    private static final long serialVersionUID = 98723508723152897L;
-
-    public ServerRedirectLoopException(String message) {
-        super(message);
-    }
-
-    public ServerRedirectLoopException(String message, Throwable t) {
-        super(message, t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
deleted file mode 100644
index 4a3c99f..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/exceptions/TooManyServerRedirectsException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.exceptions;
-
-/**
- * This is a Hedwig client side exception when there have been too many server
- * redirects during a publish/subscribe call. We only allow a certain number of
- * server redirects to find the topic master. If we have exceeded this
- * configured amount, the publish/subscribe will fail with this exception.
- *
- */
-public class TooManyServerRedirectsException extends Exception {
-
-    private static final long serialVersionUID = 2341192937965635310L;
-
-    public TooManyServerRedirectsException(String message) {
-        super(message);
-    }
-
-    public TooManyServerRedirectsException(String message, Throwable t) {
-        super(message, t);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
deleted file mode 100644
index b8a6787..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/AbstractResponseHandler.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-
-import com.google.protobuf.ByteString;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.client.exceptions.ServerRedirectLoopException;
-import org.apache.hedwig.client.exceptions.TooManyServerRedirectsException;
-import org.apache.hedwig.client.netty.NetUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import static org.apache.hedwig.util.VarArgs.va;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractResponseHandler {
-
-    private final static Logger logger = 
LoggerFactory.getLogger(AbstractResponseHandler.class);
-
-    protected final ClientConfiguration cfg;
-    protected final HChannelManager channelManager;
-
-    protected AbstractResponseHandler(ClientConfiguration cfg,
-                                      HChannelManager channelManager) {
-        this.cfg = cfg;
-        this.channelManager = channelManager;
-    }
-
-    /**
-     * Logic to handle received response.
-     *
-     * @param response
-     *            PubSubResponse received from hub server.
-     * @param pubSubData
-     *            PubSubData for the pub/sub request.
-     * @param channel
-     *            Channel we used to make the request.
-     */
-    public abstract void handleResponse(PubSubResponse response, PubSubData 
pubSubData,
-                                        Channel channel) throws Exception;
-
-    /**
-     * Logic to repost a PubSubRequest when the server responds with a redirect
-     * indicating they are not the topic master.
-     *
-     * @param response
-     *            PubSubResponse from the server for the redirect
-     * @param pubSubData
-     *            PubSubData for the original PubSubRequest made
-     * @param channel
-     *            Channel Channel we used to make the original PubSubRequest
-     * @throws Exception
-     *             Throws an exception if there was an error in doing the
-     *             redirect repost of the PubSubRequest
-     */
-    protected void handleRedirectResponse(PubSubResponse response, PubSubData 
pubSubData,
-                                          Channel channel)
-            throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Handling a redirect from host: {}, response: {}, 
pubSubData: {}",
-                         va(NetUtils.getHostFromChannel(channel), response, 
pubSubData));
-        }
-        // In this case, the PubSub request was done to a server that is not
-        // responsible for the topic. First make sure that we haven't
-        // exceeded the maximum number of server redirects.
-        int curNumServerRedirects = (pubSubData.triedServers == null) ? 0 : 
pubSubData.triedServers.size();
-        if (curNumServerRedirects >= cfg.getMaximumServerRedirects()) {
-            // We've already exceeded the maximum number of server redirects
-            // so consider this as an error condition for the client.
-            // Invoke the operationFailed callback and just return.
-            logger.debug("Exceeded the number of server redirects ({}) so 
error out.",
-                         curNumServerRedirects);
-            PubSubException exception = new ServiceDownException(
-                new TooManyServerRedirectsException("Already reached max 
number of redirects: "
-                                                    + curNumServerRedirects));
-            pubSubData.getCallback().operationFailed(pubSubData.context, 
exception);
-            return;
-        }
-
-        // We will redirect and try to connect to the correct server
-        // stored in the StatusMsg of the response. First store the
-        // server that we sent the PubSub request to for the topic.
-        ByteString triedServer = 
ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(
-                                                         
NetUtils.getHostFromChannel(channel)));
-        if (pubSubData.triedServers == null) {
-            pubSubData.triedServers = new LinkedList<ByteString>();
-        }
-        pubSubData.shouldClaim = true;
-        pubSubData.triedServers.add(triedServer);
-
-        // Now get the redirected server host (expected format is
-        // Hostname:Port:SSLPort) from the server's response message. If one is
-        // not given for some reason, then redirect to the default server
-        // host/VIP to repost the request.
-        String statusMsg = response.getStatusMsg();
-        InetSocketAddress redirectedHost;
-        boolean redirectToDefaultServer;
-        if (statusMsg != null && statusMsg.length() > 0) {
-            if (cfg.isSSLEnabled()) {
-                redirectedHost = new 
HedwigSocketAddress(statusMsg).getSSLSocketAddress();
-            } else {
-                redirectedHost = new 
HedwigSocketAddress(statusMsg).getSocketAddress();
-            }
-            redirectToDefaultServer = false;
-        } else {
-            redirectedHost = cfg.getDefaultServerHost();
-            redirectToDefaultServer = true;
-        }
-
-        // Make sure the redirected server is not one we've already attempted
-        // already before in this PubSub request.
-        if 
(pubSubData.triedServers.contains(ByteString.copyFromUtf8(HedwigSocketAddress.sockAddrStr(redirectedHost))))
 {
-            logger.error("We've already sent this PubSubRequest before to 
redirectedHost: {}, pubSubData: {}",
-                         va(redirectedHost, pubSubData));
-            PubSubException exception = new ServiceDownException(
-                new ServerRedirectLoopException("Already made the request 
before to redirected host: "
-                                                + redirectedHost));
-            pubSubData.getCallback().operationFailed(pubSubData.context, 
exception);
-            return;
-        }
-
-        // submit the pub/sub request to redirected host
-        if (redirectToDefaultServer) {
-            channelManager.submitOpToDefaultServer(pubSubData);
-        } else {
-            channelManager.redirectToHost(pubSubData, redirectedHost);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
deleted file mode 100644
index c5b58a0..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import 
org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public class CloseSubscriptionResponseHandler extends AbstractResponseHandler {
-
-    private final static Logger logger = 
LoggerFactory.getLogger(CloseSubscriptionResponseHandler.class);
-
-    public CloseSubscriptionResponseHandler(ClientConfiguration cfg,
-                                            HChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    public void handleResponse(final PubSubResponse response, final PubSubData 
pubSubData,
-                               final Channel channel)
-            throws Exception {
-        switch (response.getStatusCode()) {
-        case SUCCESS:
-            pubSubData.getCallback().operationFinished(pubSubData.context, 
null);
-            break;
-        case CLIENT_NOT_SUBSCRIBED:
-            // For closesubscription requests, the server says that the client 
was
-            // never subscribed to the topic.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new 
ClientNotSubscribedException(
-                                                    "Client was never 
subscribed to topic: " +
-                                                        
pubSubData.topic.toStringUtf8() + ", subscriberId: " +
-                                                        
pubSubData.subscriberId.toStringUtf8()));
-            break;
-        case SERVICE_DOWN:
-            // Response was service down failure so just invoke the callback's
-            // operationFailed method.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new 
ServiceDownException(
-                                                    "Server responded with a 
SERVICE_DOWN status"));
-            break;
-        case NOT_RESPONSIBLE_FOR_TOPIC:
-            // Redirect response so we'll need to repost the original
-            // Unsubscribe Request
-            handleRedirectResponse(response, pubSubData, channel);
-            break;
-        default:
-            // Consider all other status codes as errors, operation failed
-            // cases.
-            logger.error("Unexpected error response from server for 
PubSubResponse: " + response);
-            pubSubData.getCallback().operationFailed(pubSubData.context, new 
ServiceDownException(
-                                                    "Server responded with a 
status code of: " +
-                                                        
response.getStatusCode()));
-            break;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
deleted file mode 100644
index 32517b0..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/MessageConsumeCallback.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import java.util.TimerTask;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.MessageConsumeData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.util.Callback;
-import static org.apache.hedwig.util.VarArgs.va;
-
-/**
- * This is the Callback used by the MessageHandlers on the client app when
- * they've finished consuming a subscription message sent from the server
- * asynchronously. This callback back to the client libs will be stateless so 
we
- * can use a singleton for the class. The object context used should be the
- * MessageConsumeData type. That will contain all of the information needed to
- * call the message consume logic in the client lib HChannelHandler.
- *
- */
-public class MessageConsumeCallback implements Callback<Void> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(MessageConsumeCallback.class);
-
-    private final HChannelManager channelManager;
-    private final long consumeRetryWaitTime;
-
-    public MessageConsumeCallback(ClientConfiguration cfg,
-                                  HChannelManager channelManager) {
-        this.channelManager = channelManager;
-        this.consumeRetryWaitTime =
-            cfg.getMessageConsumeRetryWaitTime();
-    }
-
-    class MessageConsumeRetryTask extends TimerTask {
-        private final MessageConsumeData messageConsumeData;
-
-        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) {
-            this.messageConsumeData = messageConsumeData;
-        }
-
-        @Override
-        public void run() {
-            // Try to consume the message again
-            SubscribeResponseHandler subscribeHChannelHandler =
-                
channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
-            if (null == subscribeHChannelHandler ||
-                
!subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
-                logger.warn("No subscription {} found to retry delivering 
message {}.",
-                            va(messageConsumeData.topicSubscriber,
-                               
MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
-                return;
-            }
-
-            
subscribeHChannelHandler.asyncMessageDeliver(messageConsumeData.topicSubscriber,
-                                                         
messageConsumeData.msg);
-        }
-    }
-
-    public void operationFinished(Object ctx, Void resultOfOperation) {
-        MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
-
-        SubscribeResponseHandler subscribeHChannelHandler =
-            
channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
-        if (null == subscribeHChannelHandler ||
-            
!subscribeHChannelHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
-            logger.warn("No subscription {} found to consume message {}.",
-                        va(messageConsumeData.topicSubscriber,
-                           
MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
-            return;
-        }
-
-        // Message has been successfully consumed by the client app so callback
-        // to the HChannelHandler indicating that the message is consumed.
-        
subscribeHChannelHandler.messageConsumed(messageConsumeData.topicSubscriber,
-                                                 messageConsumeData.msg);
-    }
-
-    public void operationFailed(Object ctx, PubSubException exception) {
-        // Message has NOT been successfully consumed by the client app so
-        // callback to the HChannelHandler to try the async MessageHandler
-        // Consume logic again.
-        MessageConsumeData messageConsumeData = (MessageConsumeData) ctx;
-        logger.error("Message was not consumed successfully by client 
MessageHandler: {}",
-                     messageConsumeData);
-
-        // Sleep a pre-configured amount of time (in milliseconds) before we
-        // do the retry. In the future, we can have more dynamic logic on
-        // what duration to sleep based on how many times we've retried, or
-        // perhaps what the last amount of time we slept was. We could stick
-        // some of this meta-data into the MessageConsumeData when we retry.
-        channelManager.schedule(new 
MessageConsumeRetryTask(messageConsumeData),
-                                consumeRetryWaitTime);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
deleted file mode 100644
index 3f5fbcd..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PubSubCallback.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-
-/**
- * This class is used when we are doing synchronous type of operations. All
- * underlying client ops in Hedwig are async so this is just a way to make the
- * async calls synchronous.
- *
- */
-public class PubSubCallback implements Callback<PubSubProtocol.ResponseBody> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(PubSubCallback.class);
-
-    // Private member variables
-    private final PubSubData pubSubData;
-    // Boolean indicator to see if the sync PubSub call was successful or not.
-    private boolean isCallSuccessful;
-    // For sync callbacks, we'd like to know what the PubSubException is thrown
-    // on failure. This is so we can have a handle to the exception and rethrow
-    // it later.
-    private PubSubException failureException;
-
-    private PubSubProtocol.ResponseBody responseBody;
-
-    // Constructor
-    public PubSubCallback(PubSubData pubSubData) {
-        this.pubSubData = pubSubData;
-    }
-
-    public void operationFinished(Object ctx, PubSubProtocol.ResponseBody 
resultOfOperation) {
-        logger.debug("PubSub call succeeded for pubSubData: {}", pubSubData);
-        // Wake up the main sync PubSub thread that is waiting for us to
-        // complete.
-        synchronized (pubSubData) {
-            this.responseBody = resultOfOperation;
-            isCallSuccessful = true;
-            pubSubData.isDone = true;
-            pubSubData.notify();
-        }
-    }
-
-    public void operationFailed(Object ctx, PubSubException exception) {
-        logger.debug("PubSub call failed with exception: {}, pubSubData: {}", 
exception, pubSubData);
-        // Wake up the main sync PubSub thread that is waiting for us to
-        // complete.
-        synchronized (pubSubData) {
-            isCallSuccessful = false;
-            failureException = exception;
-            pubSubData.isDone = true;
-            pubSubData.notify();
-        }
-    }
-
-    // Public getter to determine if the PubSub callback is successful or not
-    // based on the PubSub ack response from the server.
-    public boolean getIsCallSuccessful() {
-        return isCallSuccessful;
-    }
-
-    // Public getter to retrieve what the PubSubException was that occurred 
when
-    // the operation failed.
-    public PubSubException getFailureException() {
-        return failureException;
-    }
-
-
-    public PubSubProtocol.ResponseBody getResponseBody() {
-        return responseBody;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
 
b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
deleted file mode 100644
index 8b2e9d1..0000000
--- 
a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/PublishResponseHandler.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.client.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.data.PubSubData;
-import org.apache.hedwig.client.netty.HChannelManager;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-
-public class PublishResponseHandler extends AbstractResponseHandler {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(PublishResponseHandler.class);
-
-    public PublishResponseHandler(ClientConfiguration cfg,
-                                  HChannelManager channelManager) {
-        super(cfg, channelManager);
-    }
-
-    @Override
-    public void handleResponse(PubSubResponse response, PubSubData pubSubData,
-                               Channel channel) throws Exception {
-        switch (response.getStatusCode()) {
-        case SUCCESS:
-            // Response was success so invoke the callback's operationFinished
-            // method.
-            pubSubData.operationFinishedToCallback(pubSubData.context,
-                response.hasResponseBody() ? response.getResponseBody() : 
null);
-            break;
-        case SERVICE_DOWN:
-            // Response was service down failure so just invoke the callback's
-            // operationFailed method.
-            pubSubData.getCallback().operationFailed(pubSubData.context, new 
ServiceDownException(
-                                                    "Server responded with a 
SERVICE_DOWN status"));
-            break;
-        case NOT_RESPONSIBLE_FOR_TOPIC:
-            // Redirect response so we'll need to repost the original Publish
-            // Request
-            handleRedirectResponse(response, pubSubData, channel);
-            break;
-        default:
-            // Consider all other status codes as errors, operation failed
-            // cases.
-            logger.error("Unexpected error response from server for 
PubSubResponse: " + response);
-            pubSubData.getCallback().operationFailed(pubSubData.context, new 
ServiceDownException(
-                                                    "Server responded with a 
status code of: " +
-                                                        
response.getStatusCode()));
-            break;
-        }
-    }
-}

Reply via email to