http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
deleted file mode 100644
index 4fe8141..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Routing Service based on a given {@link 
com.twitter.common.zookeeper.ServerSet}.
- */
-class ServerSetRoutingService extends Thread implements RoutingService {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ServerSetRoutingService.class);
-
-    static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
-        return new ServerSetRoutingServiceBuilder();
-    }
-
-    /**
-     * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based 
routing service.
-     */
-    static class ServerSetRoutingServiceBuilder implements 
RoutingService.Builder {
-
-        private ServerSetWatcher serverSetWatcher;
-
-        private ServerSetRoutingServiceBuilder() {}
-
-        public ServerSetRoutingServiceBuilder 
serverSetWatcher(ServerSetWatcher serverSetWatcher) {
-            this.serverSetWatcher = serverSetWatcher;
-            return this;
-        }
-
-        @Override
-        public Builder statsReceiver(StatsReceiver statsReceiver) {
-            return this;
-        }
-
-        @Override
-        public RoutingService build() {
-            checkNotNull(serverSetWatcher, "No serverset watcher provided.");
-            return new ServerSetRoutingService(this.serverSetWatcher);
-        }
-    }
-
-    private static class HostComparator implements Comparator<SocketAddress> {
-
-        private static final HostComparator INSTANCE = new HostComparator();
-
-        @Override
-        public int compare(SocketAddress o1, SocketAddress o2) {
-            return o1.toString().compareTo(o2.toString());
-        }
-    }
-
-    private final ServerSetWatcher serverSetWatcher;
-
-    private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>();
-    private List<SocketAddress> hostList = new ArrayList<SocketAddress>();
-    private final HashFunction hasher = Hashing.md5();
-
-    // Server Set Changes
-    private final AtomicReference<ImmutableSet<DLSocketAddress>> 
serverSetChange =
-            new AtomicReference<ImmutableSet<DLSocketAddress>>(null);
-    private final CountDownLatch changeLatch = new CountDownLatch(1);
-
-    // Listeners
-    protected final CopyOnWriteArraySet<RoutingListener> listeners =
-            new CopyOnWriteArraySet<RoutingListener>();
-
-    ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
-        super("ServerSetRoutingService");
-        this.serverSetWatcher = serverSetWatcher;
-    }
-
-    @Override
-    public Set<SocketAddress> getHosts() {
-        synchronized (hostSet) {
-            return ImmutableSet.copyOf(hostSet);
-        }
-    }
-
-    @Override
-    public void startService() {
-        start();
-        try {
-            if (!changeLatch.await(1, TimeUnit.MINUTES)) {
-                logger.warn("No serverset change received in 1 minute.");
-            }
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted waiting first serverset change : ", e);
-        }
-        logger.info("{} Routing Service Started.", getClass().getSimpleName());
-    }
-
-    @Override
-    public void stopService() {
-        Thread.currentThread().interrupt();
-        try {
-            join();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger.warn("Interrupted on waiting serverset routing service to 
finish : ", e);
-        }
-        logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
-    }
-
-    @Override
-    public RoutingService registerListener(RoutingListener listener) {
-        listeners.add(listener);
-        return this;
-    }
-
-    @Override
-    public RoutingService unregisterListener(RoutingListener listener) {
-        listeners.remove(listener);
-        return this;
-    }
-
-    @Override
-    public SocketAddress getHost(String key, RoutingContext rContext)
-            throws NoBrokersAvailableException {
-        SocketAddress address = null;
-        synchronized (hostSet) {
-            if (0 != hostList.size()) {
-                int hashCode = hasher.hashUnencodedChars(key).asInt();
-                int hostId = signSafeMod(hashCode, hostList.size());
-                address = hostList.get(hostId);
-                if (rContext.isTriedHost(address)) {
-                    ArrayList<SocketAddress> newList = new 
ArrayList<SocketAddress>(hostList);
-                    newList.remove(hostId);
-                    // pickup a new host by rehashing it.
-                    hostId = signSafeMod(hashCode, newList.size());
-                    address = newList.get(hostId);
-                    int i = hostId;
-                    while (rContext.isTriedHost(address)) {
-                        i = (i + 1) % newList.size();
-                        if (i == hostId) {
-                            address = null;
-                            break;
-                        }
-                        address = newList.get(i);
-                    }
-                }
-            }
-        }
-        if (null == address) {
-            throw new NoBrokersAvailableException("No host is available.");
-        }
-        return address;
-    }
-
-    @Override
-    public void removeHost(SocketAddress host, Throwable reason) {
-        synchronized (hostSet) {
-            if (hostSet.remove(host)) {
-                logger.info("Node {} left due to : ", host, reason);
-            }
-            hostList = new ArrayList<SocketAddress>(hostSet);
-            Collections.sort(hostList, HostComparator.INSTANCE);
-            logger.info("Host list becomes : {}.", hostList);
-        }
-    }
-
-    @Override
-    public void run() {
-        try {
-            serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() {
-                @Override
-                public void onChange(ImmutableSet<DLSocketAddress> 
serviceInstances) {
-                    ImmutableSet<DLSocketAddress> lastValue = 
serverSetChange.getAndSet(serviceInstances);
-                    if (null == lastValue) {
-                        ImmutableSet<DLSocketAddress> mostRecentValue;
-                        do {
-                            mostRecentValue = serverSetChange.get();
-                            performServerSetChange(mostRecentValue);
-                            changeLatch.countDown();
-                        } while 
(!serverSetChange.compareAndSet(mostRecentValue, null));
-                    }
-                }
-            });
-        } catch (Exception e) {
-            logger.error("Fail to monitor server set : ", e);
-            Runtime.getRuntime().exit(-1);
-        }
-    }
-
-    protected synchronized void 
performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) {
-        Set<SocketAddress> newSet = new HashSet<SocketAddress>();
-        for (DLSocketAddress serviceInstance : serverSet) {
-            newSet.add(serviceInstance.getSocketAddress());
-        }
-
-        Set<SocketAddress> removed;
-        Set<SocketAddress> added;
-        synchronized (hostSet) {
-            removed = Sets.difference(hostSet, newSet).immutableCopy();
-            added = Sets.difference(newSet, hostSet).immutableCopy();
-            for (SocketAddress node: removed) {
-                if (hostSet.remove(node)) {
-                    logger.info("Node {} left.", node);
-                }
-            }
-            for (SocketAddress node: added) {
-                if (hostSet.add(node)) {
-                    logger.info("Node {} joined.", node);
-                }
-            }
-        }
-
-        for (SocketAddress addr : removed) {
-            for (RoutingListener listener : listeners) {
-                listener.onServerLeft(addr);
-            }
-        }
-
-        for (SocketAddress addr : added) {
-            for (RoutingListener listener : listeners) {
-                listener.onServerJoin(addr);
-            }
-        }
-
-        synchronized (hostSet) {
-            hostList = new ArrayList<SocketAddress>(hostSet);
-            Collections.sort(hostList, HostComparator.INSTANCE);
-            logger.info("Host list becomes : {}.", hostList);
-        }
-
-    }
-
-    static int signSafeMod(long dividend, int divisor) {
-        int mod = (int) (dividend % divisor);
-
-        if (mod < 0) {
-            mod += divisor;
-        }
-
-        return mod;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
deleted file mode 100644
index 77b7beb..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.distributedlog.service.DLSocketAddress;
-
-/**
- * Watch on server set changes.
- */
-public interface ServerSetWatcher {
-
-    /**
-     * Exception thrown when failed to monitor serverset.
-     */
-    class MonitorException extends Exception {
-
-        private static final long serialVersionUID = 392751505154339548L;
-
-        public MonitorException(String msg) {
-            super(msg);
-        }
-
-        public MonitorException(String msg, Throwable cause) {
-            super(msg, cause);
-        }
-    }
-
-    /**
-     * An interface to an object that is interested in receiving notification 
whenever the host set changes.
-     */
-    interface ServerSetMonitor {
-
-        /**
-         * Called when either the available set of services changes.
-         *
-         * <p>It happens either when a service dies or a new INSTANCE comes 
on-line or
-         * when an existing service advertises a status or health change.
-         *
-         * @param hostSet the current set of available ServiceInstances
-         */
-        void onChange(ImmutableSet<DLSocketAddress> hostSet);
-    }
-
-    /**
-     * Registers a monitor to receive change notices for this server set as 
long as this jvm process is alive.
-     *
-     * <p>Blocks until the initial server set can be gathered and delivered to 
the monitor.
-     * The monitor will be notified if the membership set or parameters of 
existing members have
-     * changed.
-     *
-     * @param monitor the server set monitor to call back when the host set 
changes
-     * @throws MonitorException if there is a problem monitoring the host set
-     */
-    void watch(final ServerSetMonitor monitor) throws MonitorException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
deleted file mode 100644
index 753a1af..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Sets;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * Single Host Routing Service.
- */
-public class SingleHostRoutingService implements RoutingService {
-
-    public static SingleHostRoutingService of(SocketAddress address) {
-        return new SingleHostRoutingService(address);
-    }
-
-    /**
-     * Builder to build single host based routing service.
-     *
-     * @return builder to build single host based routing service.
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder to build single host based routing service.
-     */
-    public static class Builder implements RoutingService.Builder {
-
-        private SocketAddress address;
-
-        private Builder() {}
-
-        public Builder address(SocketAddress address) {
-            this.address = address;
-            return this;
-        }
-
-        @Override
-        public RoutingService.Builder statsReceiver(StatsReceiver 
statsReceiver) {
-            return this;
-        }
-
-        @Override
-        public RoutingService build() {
-            checkNotNull(address, "Host is null");
-            return new SingleHostRoutingService(address);
-        }
-    }
-
-    private SocketAddress address;
-    private final CopyOnWriteArraySet<RoutingListener> listeners =
-            new CopyOnWriteArraySet<RoutingListener>();
-
-    SingleHostRoutingService(SocketAddress address) {
-        this.address = address;
-    }
-
-    public void setAddress(SocketAddress address) {
-        this.address = address;
-    }
-
-    @Override
-    public Set<SocketAddress> getHosts() {
-        return Sets.newHashSet(address);
-    }
-
-    @Override
-    public void startService() {
-        // no-op
-        for (RoutingListener listener : listeners) {
-            listener.onServerJoin(address);
-        }
-    }
-
-    @Override
-    public void stopService() {
-        // no-op
-    }
-
-    @Override
-    public RoutingService registerListener(RoutingListener listener) {
-        listeners.add(listener);
-        return this;
-    }
-
-    @Override
-    public RoutingService unregisterListener(RoutingListener listener) {
-        listeners.remove(listener);
-        return null;
-    }
-
-    @Override
-    public SocketAddress getHost(String key, RoutingContext rContext)
-            throws NoBrokersAvailableException {
-        if (rContext.isTriedHost(address)) {
-            throw new NoBrokersAvailableException("No hosts is available : 
routing context = " + rContext);
-        }
-        return address;
-    }
-
-    @Override
-    public void removeHost(SocketAddress address, Throwable reason) {
-        // no-op
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
deleted file mode 100644
index 2fc8de0..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.twitter.finagle.Addr;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Addrs;
-import com.twitter.finagle.Name;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * A {@link Name} implementation for testing purpose.
- */
-public class TestName implements Name {
-
-    private static final Logger LOG = LoggerFactory.getLogger(TestName.class);
-
-    private AbstractFunction1<Addr, BoxedUnit> callback = null;
-
-    public void changes(AbstractFunction1<Addr, BoxedUnit> callback) {
-        this.callback = callback;
-    }
-
-    public void changeAddrs(List<Address> addresses) {
-        if (null != callback) {
-            LOG.info("Sending a callback {}", addresses);
-            callback.apply(Addrs.newBoundAddr(addresses));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
deleted file mode 100644
index 1ff7c93..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-/**
- * Twitter {@link ServerSet} based watcher.
- */
-public class TwitterServerSetWatcher implements ServerSetWatcher {
-
-    private final ServerSet serverSet;
-    private final boolean resolvedFromName;
-
-    /**
-     * Construct a {@link ServerSet} based watcher.
-     *
-     * @param serverSet server set.
-     * @param resolvedFromName whether to resolve hosts from {@link 
com.twitter.finagle.Name}.
-     */
-    public TwitterServerSetWatcher(ServerSet serverSet,
-                                   boolean resolvedFromName) {
-        this.serverSet = serverSet;
-        this.resolvedFromName = resolvedFromName;
-    }
-
-    /**
-     * Registers a monitor to receive change notices for this server set as 
long as this jvm process is alive.
-     *
-     * <p>Blocks until the initial server set can be gathered and delivered to 
the monitor.
-     * The monitor will be notified if the membership set or parameters of 
existing members have
-     * changed.
-     *
-     * @param monitor the server set monitor to call back when the host set 
changes
-     * @throws MonitorException if there is a problem monitoring the host set
-     */
-    public void watch(final ServerSetMonitor monitor)
-            throws MonitorException {
-        try {
-            serverSet.watch(new 
DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
-                @Override
-                public void onChange(ImmutableSet<ServiceInstance> 
serviceInstances) {
-                    Set<DLSocketAddress> dlServers = Sets.newHashSet();
-                    for (ServiceInstance serviceInstance : serviceInstances) {
-                        Endpoint endpoint = 
serviceInstance.getAdditionalEndpoints().get("thrift");
-                        InetSocketAddress inetAddr =
-                                new InetSocketAddress(endpoint.getHost(), 
endpoint.getPort());
-                        int shardId = resolvedFromName ? -1 : 
serviceInstance.getShard();
-                        DLSocketAddress address = new DLSocketAddress(shardId, 
inetAddr);
-                        dlServers.add(address);
-                    }
-                    monitor.onChange(ImmutableSet.copyOf(dlServers));
-                }
-            });
-        } catch (DynamicHostSet.MonitorException me) {
-            throw new MonitorException("Failed to monitor server set : ", me);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
deleted file mode 100644
index 352d755..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Routing Mechanisms to route the traffic to the owner of streams.
- */
-package org.apache.distributedlog.client.routing;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
deleted file mode 100644
index 93cdf7a..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.serverset;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.ServerSets;
-import com.twitter.common.zookeeper.ZooKeeperClient;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.ZooDefs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A wrapper over zookeeper client and its server set.
- */
-public class DLZkServerSet {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(DLZkServerSet.class);
-
-    static final String ZNODE_WRITE_PROXY = ".write_proxy";
-
-    private static String getZKServersFromDLUri(URI uri) {
-        return uri.getAuthority().replace(";", ",");
-    }
-
-    private static Iterable<InetSocketAddress> getZkAddresses(URI uri) {
-        String zkServers = getZKServersFromDLUri(uri);
-        String[] zkServerList = StringUtils.split(zkServers, ',');
-        ImmutableList.Builder<InetSocketAddress> builder = 
ImmutableList.builder();
-        for (String zkServer : zkServerList) {
-            HostAndPort hostAndPort = 
HostAndPort.fromString(zkServer).withDefaultPort(2181);
-            builder.add(InetSocketAddress.createUnresolved(
-                    hostAndPort.getHostText(),
-                    hostAndPort.getPort()));
-        }
-        return builder.build();
-    }
-
-    public static DLZkServerSet of(URI uri,
-                                   int zkSessionTimeoutMs) {
-        // Create zookeeper and server set
-        String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY;
-        Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri);
-        ZooKeeperClient zkClient =
-                new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, 
Time.MILLISECONDS), zkAddresses);
-        ServerSet serverSet = ServerSets.create(zkClient, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath);
-        return new DLZkServerSet(zkClient, serverSet);
-    }
-
-    private final ZooKeeperClient zkClient;
-    private final ServerSet zkServerSet;
-
-    public DLZkServerSet(ZooKeeperClient zkClient,
-                         ServerSet zkServerSet) {
-        this.zkClient = zkClient;
-        this.zkServerSet = zkServerSet;
-    }
-
-    public ZooKeeperClient getZkClient() {
-        return zkClient;
-    }
-
-    public ServerSet getServerSet() {
-        return zkServerSet;
-    }
-
-    public void close() {
-        zkClient.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
deleted file mode 100644
index 38a7544..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Utils related to server set.
- */
-package org.apache.distributedlog.client.serverset;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
deleted file mode 100644
index f1da33c..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Default implementation of {@link SpeculativeRequestExecutionPolicy}.
- */
-public class DefaultSpeculativeRequestExecutionPolicy implements 
SpeculativeRequestExecutionPolicy {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class);
-    final int firstSpeculativeRequestTimeout;
-    final int maxSpeculativeRequestTimeout;
-    final float backoffMultiplier;
-    int nextSpeculativeRequestTimeout;
-
-    public DefaultSpeculativeRequestExecutionPolicy(int 
firstSpeculativeRequestTimeout,
-                                                    int 
maxSpeculativeRequestTimeout,
-                                                    float backoffMultiplier) {
-        this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
-        this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout;
-        this.backoffMultiplier = backoffMultiplier;
-        this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
-
-        if (backoffMultiplier <= 0) {
-            throw new IllegalArgumentException("Invalid value provided for 
backoffMultiplier");
-        }
-
-        // Prevent potential over flow
-        if (Math.round((double) maxSpeculativeRequestTimeout * (double) 
backoffMultiplier) > Integer.MAX_VALUE) {
-            throw new IllegalArgumentException("Invalid values for 
maxSpeculativeRequestTimeout and backoffMultiplier");
-        }
-    }
-
-    @VisibleForTesting
-    int getNextSpeculativeRequestTimeout() {
-        return nextSpeculativeRequestTimeout;
-    }
-
-    /**
-     * Initialize the speculative request execution policy.
-     *
-     * @param scheduler The scheduler service to issue the speculative request
-     * @param requestExecutor The executor is used to issue the actual 
speculative requests
-     */
-    @Override
-    public void initiateSpeculativeRequest(final ScheduledExecutorService 
scheduler,
-                                           final SpeculativeRequestExecutor 
requestExecutor) {
-        issueSpeculativeRequest(scheduler, requestExecutor);
-    }
-
-    private void issueSpeculativeRequest(final ScheduledExecutorService 
scheduler,
-                                         final SpeculativeRequestExecutor 
requestExecutor) {
-        Future<Boolean> issueNextRequest = 
requestExecutor.issueSpeculativeRequest();
-        issueNextRequest.addEventListener(new FutureEventListener<Boolean>() {
-            // we want this handler to run immediately after we push the big 
red button!
-            @Override
-            public void onSuccess(Boolean issueNextRequest) {
-                if (issueNextRequest) {
-                    scheduleSpeculativeRequest(scheduler, requestExecutor, 
nextSpeculativeRequestTimeout);
-                    nextSpeculativeRequestTimeout = 
Math.min(maxSpeculativeRequestTimeout,
-                            (int) (nextSpeculativeRequestTimeout * 
backoffMultiplier));
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Stopped issuing speculative requests for 
{}, "
-                            + "speculativeReadTimeout = {}", requestExecutor, 
nextSpeculativeRequestTimeout);
-                    }
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable thrown) {
-                LOG.warn("Failed to issue speculative request for {}, 
speculativeReadTimeout = {} : ",
-                        new Object[] { requestExecutor, 
nextSpeculativeRequestTimeout, thrown });
-            }
-        });
-    }
-
-    private void scheduleSpeculativeRequest(final ScheduledExecutorService 
scheduler,
-                                            final SpeculativeRequestExecutor 
requestExecutor,
-                                            final int 
speculativeRequestTimeout) {
-        try {
-            scheduler.schedule(new Runnable() {
-                @Override
-                public void run() {
-                    issueSpeculativeRequest(scheduler, requestExecutor);
-                }
-            }, speculativeRequestTimeout, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException re) {
-            if (!scheduler.isShutdown()) {
-                LOG.warn("Failed to schedule speculative request for {}, 
speculativeReadTimeout = {} : ",
-                        new Object[]{requestExecutor, 
speculativeRequestTimeout, re});
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
deleted file mode 100644
index faf45c2..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Speculative request execution policy.
- */
-public interface SpeculativeRequestExecutionPolicy {
-    /**
-     * Initialize the speculative request execution policy and initiate 
requests.
-     *
-     * @param scheduler The scheduler service to issue the speculative request
-     * @param requestExecutor The executor is used to issue the actual 
speculative requests
-     */
-    void initiateSpeculativeRequest(ScheduledExecutorService scheduler,
-                                    SpeculativeRequestExecutor 
requestExecutor);
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
deleted file mode 100644
index 68fe8b0..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import com.twitter.util.Future;
-
-/**
- * Executor to execute speculative requests.
- */
-public interface SpeculativeRequestExecutor {
-
-    /**
-     * Issues a speculative request and indicates if more speculative requests 
should be issued.
-     *
-     * @return whether more speculative requests should be issued.
-     */
-    Future<Boolean> issueSpeculativeRequest();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
deleted file mode 100644
index 4bdd4b1..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Speculative Mechanism.
- */
-package org.apache.distributedlog.client.speculative;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
deleted file mode 100644
index c2dcddd..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Client Stats.
- */
-public class ClientStats {
-
-    // Region Resolver
-    private final RegionResolver regionResolver;
-
-    // Stats
-    private final StatsReceiver statsReceiver;
-    private final ClientStatsLogger clientStatsLogger;
-    private final boolean enableRegionStats;
-    private final ConcurrentMap<String, ClientStatsLogger> 
regionClientStatsLoggers;
-    private final ConcurrentMap<String, OpStats> opStatsMap;
-
-    public ClientStats(StatsReceiver statsReceiver,
-                       boolean enableRegionStats,
-                       RegionResolver regionResolver) {
-        this.statsReceiver = statsReceiver;
-        this.clientStatsLogger = new ClientStatsLogger(statsReceiver);
-        this.enableRegionStats = enableRegionStats;
-        this.regionClientStatsLoggers = new ConcurrentHashMap<String, 
ClientStatsLogger>();
-        this.regionResolver = regionResolver;
-        this.opStatsMap = new ConcurrentHashMap<String, OpStats>();
-    }
-
-    public OpStats getOpStats(String op) {
-        OpStats opStats = opStatsMap.get(op);
-        if (null != opStats) {
-            return opStats;
-        }
-        OpStats newStats = new OpStats(statsReceiver.scope(op),
-                enableRegionStats, regionResolver);
-        OpStats oldStats = opStatsMap.putIfAbsent(op, newStats);
-        if (null == oldStats) {
-            return newStats;
-        } else {
-            return oldStats;
-        }
-    }
-
-    private ClientStatsLogger getRegionClientStatsLogger(SocketAddress 
address) {
-        String region = regionResolver.resolveRegion(address);
-        return getRegionClientStatsLogger(region);
-    }
-
-    private ClientStatsLogger getRegionClientStatsLogger(String region) {
-        ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region);
-        if (null == statsLogger) {
-            ClientStatsLogger newStatsLogger = new 
ClientStatsLogger(statsReceiver.scope(region));
-            ClientStatsLogger oldStatsLogger = 
regionClientStatsLoggers.putIfAbsent(region, newStatsLogger);
-            if (null == oldStatsLogger) {
-                statsLogger = newStatsLogger;
-            } else {
-                statsLogger = oldStatsLogger;
-            }
-        }
-        return statsLogger;
-    }
-
-    public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) {
-        if (enableRegionStats && null != addr) {
-            return getRegionClientStatsLogger(addr).getStatsReceiver();
-        } else {
-            return clientStatsLogger.getStatsReceiver();
-        }
-    }
-
-    public void completeProxyRequest(SocketAddress addr, StatusCode code, long 
startTimeNanos) {
-        clientStatsLogger.completeProxyRequest(code, startTimeNanos);
-        if (enableRegionStats && null != addr) {
-            getRegionClientStatsLogger(addr).completeProxyRequest(code, 
startTimeNanos);
-        }
-    }
-
-    public void failProxyRequest(SocketAddress addr, Throwable cause, long 
startTimeNanos) {
-        clientStatsLogger.failProxyRequest(cause, startTimeNanos);
-        if (enableRegionStats && null != addr) {
-            getRegionClientStatsLogger(addr).failProxyRequest(cause, 
startTimeNanos);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
deleted file mode 100644
index 530c632..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Stats Logger to collect client stats.
- */
-public class ClientStatsLogger {
-
-    // Stats
-    private final StatsReceiver statsReceiver;
-    private final StatsReceiver responseStatsReceiver;
-    private final ConcurrentMap<StatusCode, Counter> responseStats =
-            new ConcurrentHashMap<StatusCode, Counter>();
-    private final StatsReceiver exceptionStatsReceiver;
-    private final ConcurrentMap<Class<?>, Counter> exceptionStats =
-            new ConcurrentHashMap<Class<?>, Counter>();
-
-    private final Stat proxySuccessLatencyStat;
-    private final Stat proxyFailureLatencyStat;
-
-    public ClientStatsLogger(StatsReceiver statsReceiver) {
-        this.statsReceiver = statsReceiver;
-        responseStatsReceiver = statsReceiver.scope("responses");
-        exceptionStatsReceiver = statsReceiver.scope("exceptions");
-        StatsReceiver proxyLatencyStatReceiver = 
statsReceiver.scope("proxy_request_latency");
-        proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success");
-        proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure");
-    }
-
-    public StatsReceiver getStatsReceiver() {
-        return statsReceiver;
-    }
-
-    private Counter getResponseCounter(StatusCode code) {
-        Counter counter = responseStats.get(code);
-        if (null == counter) {
-            Counter newCounter = responseStatsReceiver.counter0(code.name());
-            Counter oldCounter = responseStats.putIfAbsent(code, newCounter);
-            counter = null != oldCounter ? oldCounter : newCounter;
-        }
-        return counter;
-    }
-
-    private Counter getExceptionCounter(Class<?> cls) {
-        Counter counter = exceptionStats.get(cls);
-        if (null == counter) {
-            Counter newCounter = 
exceptionStatsReceiver.counter0(cls.getName());
-            Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter);
-            counter = null != oldCounter ? oldCounter : newCounter;
-        }
-        return counter;
-    }
-
-    public void completeProxyRequest(StatusCode code, long startTimeNanos) {
-        getResponseCounter(code).incr();
-        proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos));
-    }
-
-    public void failProxyRequest(Throwable cause, long startTimeNanos) {
-        getExceptionCounter(cause.getClass()).incr();
-        proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos));
-    }
-
-    static long elapsedMicroSec(long startNanoTime) {
-        return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - 
startNanoTime);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
deleted file mode 100644
index 7a49faa..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Op Stats.
- */
-public class OpStats {
-
-    // Region Resolver
-    private final RegionResolver regionResolver;
-
-    // Stats
-    private final StatsReceiver statsReceiver;
-    private final OpStatsLogger opStatsLogger;
-    private final boolean enableRegionStats;
-    private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers;
-
-    public OpStats(StatsReceiver statsReceiver,
-                   boolean enableRegionStats,
-                   RegionResolver regionResolver) {
-        this.statsReceiver = statsReceiver;
-        this.opStatsLogger = new OpStatsLogger(statsReceiver);
-        this.enableRegionStats = enableRegionStats;
-        this.regionOpStatsLoggers = new ConcurrentHashMap<String, 
OpStatsLogger>();
-        this.regionResolver = regionResolver;
-    }
-
-    private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) {
-        String region = regionResolver.resolveRegion(address);
-        return getRegionOpStatsLogger(region);
-    }
-
-    private OpStatsLogger getRegionOpStatsLogger(String region) {
-        OpStatsLogger statsLogger = regionOpStatsLoggers.get(region);
-        if (null == statsLogger) {
-            OpStatsLogger newStatsLogger = new 
OpStatsLogger(statsReceiver.scope(region));
-            OpStatsLogger oldStatsLogger = 
regionOpStatsLoggers.putIfAbsent(region, newStatsLogger);
-            if (null == oldStatsLogger) {
-                statsLogger = newStatsLogger;
-            } else {
-                statsLogger = oldStatsLogger;
-            }
-        }
-        return statsLogger;
-    }
-
-    public void completeRequest(SocketAddress addr, long micros, int numTries) 
{
-        opStatsLogger.completeRequest(micros, numTries);
-        if (enableRegionStats && null != addr) {
-            getRegionOpStatsLogger(addr).completeRequest(micros, numTries);
-        }
-    }
-
-    public void failRequest(SocketAddress addr, long micros, int numTries) {
-        opStatsLogger.failRequest(micros, numTries);
-        if (enableRegionStats && null != addr) {
-            getRegionOpStatsLogger(addr).failRequest(micros, numTries);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
deleted file mode 100644
index b94b4be..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-
-/**
- * Stats Logger per operation type.
- */
-public class OpStatsLogger {
-
-    private final Stat successLatencyStat;
-    private final Stat failureLatencyStat;
-    private final Stat redirectStat;
-
-    public OpStatsLogger(StatsReceiver statsReceiver) {
-        StatsReceiver latencyStatReceiver = statsReceiver.scope("latency");
-        successLatencyStat = latencyStatReceiver.stat0("success");
-        failureLatencyStat = latencyStatReceiver.stat0("failure");
-        StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects");
-        redirectStat = redirectStatReceiver.stat0("times");
-    }
-
-    public void completeRequest(long micros, int numTries) {
-        successLatencyStat.add(micros);
-        redirectStat.add(numTries);
-    }
-
-    public void failRequest(long micros, int numTries) {
-        failureLatencyStat.add(micros);
-        redirectStat.add(numTries);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
deleted file mode 100644
index 110e99a..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Stats Logger for ownerships.
- */
-public class OwnershipStatsLogger {
-
-    /**
-     * Ownership related stats.
-     */
-    public static class OwnershipStat {
-        private final Counter hits;
-        private final Counter misses;
-        private final Counter removes;
-        private final Counter redirects;
-        private final Counter adds;
-
-        OwnershipStat(StatsReceiver ownershipStats) {
-            hits = ownershipStats.counter0("hits");
-            misses = ownershipStats.counter0("misses");
-            adds = ownershipStats.counter0("adds");
-            removes = ownershipStats.counter0("removes");
-            redirects = ownershipStats.counter0("redirects");
-        }
-
-        public void onHit() {
-            hits.incr();
-        }
-
-        public void onMiss() {
-            misses.incr();
-        }
-
-        public void onAdd() {
-            adds.incr();
-        }
-
-        public void onRemove() {
-            removes.incr();
-        }
-
-        public void onRedirect() {
-            redirects.incr();
-        }
-
-    }
-
-    private final OwnershipStat ownershipStat;
-    private final StatsReceiver ownershipStatsReceiver;
-    private final ConcurrentMap<String, OwnershipStat> ownershipStats =
-            new ConcurrentHashMap<String, OwnershipStat>();
-
-    public OwnershipStatsLogger(StatsReceiver statsReceiver,
-                                StatsReceiver streamStatsReceiver) {
-        this.ownershipStat = new 
OwnershipStat(statsReceiver.scope("ownership"));
-        this.ownershipStatsReceiver = 
streamStatsReceiver.scope("perstream_ownership");
-    }
-
-    private OwnershipStat getOwnershipStat(String stream) {
-        OwnershipStat stat = ownershipStats.get(stream);
-        if (null == stat) {
-            OwnershipStat newStat = new 
OwnershipStat(ownershipStatsReceiver.scope(stream));
-            OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, 
newStat);
-            stat = null != oldStat ? oldStat : newStat;
-        }
-        return stat;
-    }
-
-    public void onMiss(String stream) {
-        ownershipStat.onMiss();
-        getOwnershipStat(stream).onMiss();
-    }
-
-    public void onHit(String stream) {
-        ownershipStat.onHit();
-        getOwnershipStat(stream).onHit();
-    }
-
-    public void onRedirect(String stream) {
-        ownershipStat.onRedirect();
-        getOwnershipStat(stream).onRedirect();
-    }
-
-    public void onRemove(String stream) {
-        ownershipStat.onRemove();
-        getOwnershipStat(stream).onRemove();
-    }
-
-    public void onAdd(String stream) {
-        ownershipStat.onAdd();
-        getOwnershipStat(stream).onAdd();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
deleted file mode 100644
index 106d3fc..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Client side stats utils.
- */
-package org.apache.distributedlog.client.stats;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
deleted file mode 100644
index 68e6825..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-/**
- * Socket Address identifier for a DL proxy.
- */
-public class DLSocketAddress {
-
-    private static final int VERSION = 1;
-
-    private static final String COLON = ":";
-    private static final String SEP = ";";
-
-    private final int shard;
-    private final InetSocketAddress socketAddress;
-
-    public DLSocketAddress(int shard, InetSocketAddress socketAddress) {
-        this.shard = shard;
-        this.socketAddress = socketAddress;
-    }
-
-    /**
-     * Shard id for dl write proxy.
-     *
-     * @return shard id for dl write proxy.
-     */
-    public int getShard() {
-        return shard;
-    }
-
-    /**
-     * Socket address for dl write proxy.
-     *
-     * @return socket address for dl write proxy
-     */
-    public InetSocketAddress getSocketAddress() {
-        return socketAddress;
-    }
-
-    /**
-     * Serialize the write proxy identifier to string.
-     *
-     * @return serialized write proxy identifier.
-     */
-    public String serialize() {
-        return toLockId(socketAddress, shard);
-    }
-
-    @Override
-    public int hashCode() {
-        return socketAddress.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof DLSocketAddress)) {
-            return false;
-        }
-        DLSocketAddress other = (DLSocketAddress) obj;
-        return shard == other.shard && 
socketAddress.equals(other.socketAddress);
-    }
-
-    @Override
-    public String toString() {
-        return toLockId(socketAddress, shard);
-    }
-
-    /**
-     * Deserialize proxy address from a string representation.
-     *
-     * @param lockId
-     *          string representation of the proxy address.
-     * @return proxy address.
-     * @throws IOException
-     */
-    public static DLSocketAddress deserialize(String lockId) throws 
IOException {
-        String parts[] = lockId.split(SEP);
-        if (3 != parts.length) {
-            throw new IOException("Invalid dl socket address " + lockId);
-        }
-        int version;
-        try {
-            version = Integer.parseInt(parts[0]);
-        } catch (NumberFormatException nfe) {
-            throw new IOException("Invalid version found in " + lockId, nfe);
-        }
-        if (VERSION != version) {
-            throw new IOException("Invalid version " + version + " found in " 
+ lockId + ", expected " + VERSION);
-        }
-        int shardId;
-        try {
-            shardId = Integer.parseInt(parts[1]);
-        } catch (NumberFormatException nfe) {
-            throw new IOException("Invalid shard id found in " + lockId, nfe);
-        }
-        InetSocketAddress address = parseSocketAddress(parts[2]);
-        return new DLSocketAddress(shardId, address);
-    }
-
-    /**
-     * Parse the inet socket address from the string representation.
-     *
-     * @param addr
-     *          string representation
-     * @return inet socket address
-     */
-    public static InetSocketAddress parseSocketAddress(String addr) {
-        String[] parts =  addr.split(COLON);
-        checkArgument(parts.length == 2);
-        String hostname = parts[0];
-        int port = Integer.parseInt(parts[1]);
-        return new InetSocketAddress(hostname, port);
-    }
-
-    public static InetSocketAddress getSocketAddress(int port) throws 
UnknownHostException {
-        return new 
InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
-    }
-
-    /**
-     * Convert inet socket address to the string representation.
-     *
-     * @param address
-     *          inet socket address.
-     * @return string representation of inet socket address.
-     */
-    public static String toString(InetSocketAddress address) {
-        StringBuilder sb = new StringBuilder();
-        
sb.append(address.getHostName()).append(COLON).append(address.getPort());
-        return sb.toString();
-    }
-
-    public static String toLockId(InetSocketAddress address, int shard) {
-        StringBuilder sb = new StringBuilder();
-        
sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address));
-        return sb.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
 
b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
deleted file mode 100644
index 9f30815..0000000
--- 
a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecordSetBuffer;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Interface for distributedlog client.
- */
-public interface DistributedLogClient {
-    /**
-     * Write <i>data</i> to a given <i>stream</i>.
-     *
-     * @param stream
-     *          Stream Name.
-     * @param data
-     *          Data to write.
-     * @return a future representing a sequence id returned for this write.
-     */
-    Future<DLSN> write(String stream, ByteBuffer data);
-
-    /**
-     * Write record set to a given <i>stream</i>.
-     *
-     * <p>The record set is built from {@link 
org.apache.distributedlog.LogRecordSet.Writer}
-     *
-     * @param stream stream to write to
-     * @param recordSet record set
-     */
-    Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet);
-
-    /**
-     * Write <i>data</i> in bulk to a given <i>stream</i>.
-     *
-     * <p>Return a list of Future dlsns, one for each submitted buffer. In the 
event of a partial
-     * failure--ex. some specific buffer write fails, all subsequent writes
-     * will also fail.
-     *
-     * @param stream
-     *          Stream Name.
-     * @param data
-     *          Data to write.
-     * @return a list of futures, one for each submitted buffer.
-     */
-    List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data);
-
-    /**
-     * Truncate the stream to a given <i>dlsn</i>.
-     *
-     * @param stream
-     *          Stream Name.
-     * @param dlsn
-     *          DLSN to truncate until.
-     * @return a future representing the truncation.
-     */
-    Future<Boolean> truncate(String stream, DLSN dlsn);
-
-    /**
-     * Release the ownership of a stream <i>stream</i>.
-     *
-     * @param stream
-     *          Stream Name to release.
-     * @return a future representing the release operation.
-     */
-    Future<Void> release(String stream);
-
-    /**
-     * Delete a given stream <i>stream</i>.
-     *
-     * @param stream
-     *          Stream Name to delete.
-     * @return a future representing the delete operation.
-     */
-    Future<Void> delete(String stream);
-
-    /**
-     * Create a stream with name <i>stream</i>.
-     *
-     * @param stream
-     *          Stream Name to create.
-     * @return a future representing the create operation.
-     */
-    Future<Void> create(String stream);
-
-    /**
-     * Close the client.
-     */
-    void close();
-}

Reply via email to