http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java new file mode 100644 index 0000000..447a3ee --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -0,0 +1,1481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.security.auth.login.LoginException; +import javax.security.sasl.SaslException; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.AsyncCallback.ACLCallback; +import org.apache.zookeeper.AsyncCallback.Children2Callback; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.AsyncCallback.MultiCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.AsyncCallback.StringCallback; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.OpResult.ErrorResult; +import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.ZooKeeper.WatchRegistration; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.proto.AuthPacket; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.CreateResponse; +import org.apache.zookeeper.proto.ExistsResponse; +import org.apache.zookeeper.proto.GetACLResponse; +import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenResponse; +import org.apache.zookeeper.proto.GetDataResponse; +import org.apache.zookeeper.proto.GetSASLRequest; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.SetACLResponse; +import org.apache.zookeeper.proto.SetDataResponse; +import org.apache.zookeeper.proto.SetWatches; +import org.apache.zookeeper.proto.WatcherEvent; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.apache.zookeeper.server.ZooTrace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages the socket i/o for the client. ClientCnxn maintains a list + * of available servers to connect to and "transparently" switches servers it is + * connected to as needed. + * + */ +public class ClientCnxn { + private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class); + + private static final String ZK_SASL_CLIENT_USERNAME = + "zookeeper.sasl.client.username"; + + /* ZOOKEEPER-706: If a session has a large number of watches set then + * attempting to re-establish those watches after a connection loss may + * fail due to the SetWatches request exceeding the server's configured + * jute.maxBuffer value. To avoid this we instead split the watch + * re-establishement across multiple SetWatches calls. This constant + * controls the size of each call. It is set to 128kB to be conservative + * with respect to the server's 1MB default for jute.maxBuffer. + */ + private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024; + + /** This controls whether automatic watch resetting is enabled. + * Clients automatically reset watches during session reconnect, this + * option allows the client to turn off this behavior by setting + * the environment variable "zookeeper.disableAutoWatchReset" to "true" */ + private static boolean disableAutoWatchReset; + static { + // this var should not be public, but otw there is no easy way + // to test + disableAutoWatchReset = + Boolean.getBoolean("zookeeper.disableAutoWatchReset"); + if (LOG.isDebugEnabled()) { + LOG.debug("zookeeper.disableAutoWatchReset is " + + disableAutoWatchReset); + } + } + + static class AuthData { + AuthData(String scheme, byte data[]) { + this.scheme = scheme; + this.data = data; + } + + String scheme; + + byte data[]; + } + + private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>(); + + /** + * These are the packets that have been sent and are waiting for a response. + */ + private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); + + /** + * These are the packets that need to be sent. + */ + private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); + + private int connectTimeout; + + /** + * The timeout in ms the client negotiated with the server. This is the + * "real" timeout, not the timeout request by the client (which may have + * been increased/decreased by the server which applies bounds to this + * value. + */ + private volatile int negotiatedSessionTimeout; + + private int readTimeout; + + private final int sessionTimeout; + + private final ZooKeeper zooKeeper; + + private final ClientWatchManager watcher; + + private long sessionId; + + private byte sessionPasswd[] = new byte[16]; + + /** + * If true, the connection is allowed to go to r-o mode. This field's value + * is sent, besides other data, during session creation handshake. If the + * server on the other side of the wire is partitioned it'll accept + * read-only clients only. + */ + private boolean readOnly; + + final String chrootPath; + + final SendThread sendThread; + + final EventThread eventThread; + + /** + * Set to true when close is called. Latches the connection such that we + * don't attempt to re-connect to the server if in the middle of closing the + * connection (client sends session disconnect to server as part of close + * operation) + */ + private volatile boolean closing = false; + + /** + * A set of ZooKeeper hosts this client could connect to. + */ + private final HostProvider hostProvider; + + /** + * Is set to true when a connection to a r/w server is established for the + * first time; never changed afterwards. + * <p> + * Is used to handle situations when client without sessionId connects to a + * read-only server. Such client receives "fake" sessionId from read-only + * server, but this sessionId is invalid for other servers. So when such + * client finds a r/w server, it sends 0 instead of fake sessionId during + * connection handshake and establishes new, valid session. + * <p> + * If this field is false (which implies we haven't seen r/w server before) + * then non-zero sessionId is fake, otherwise it is valid. + */ + volatile boolean seenRwServerBefore = false; + + + public ZooKeeperSaslClient zooKeeperSaslClient; + + public long getSessionId() { + return sessionId; + } + + public byte[] getSessionPasswd() { + return sessionPasswd; + } + + public int getSessionTimeout() { + return negotiatedSessionTimeout; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress(); + SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress(); + sb + .append("sessionid:0x").append(Long.toHexString(getSessionId())) + .append(" local:").append(local) + .append(" remoteserver:").append(remote) + .append(" lastZxid:").append(lastZxid) + .append(" xid:").append(xid) + .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount()) + .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount()) + .append(" queuedpkts:").append(outgoingQueue.size()) + .append(" pendingresp:").append(pendingQueue.size()) + .append(" queuedevents:").append(eventThread.waitingEvents.size()); + + return sb.toString(); + } + + /** + * This class allows us to pass the headers and the relevant records around. + */ + static class Packet { + RequestHeader requestHeader; + + ReplyHeader replyHeader; + + Record request; + + Record response; + + ByteBuffer bb; + + /** Client's view of the path (may differ due to chroot) **/ + String clientPath; + /** Servers's view of the path (may differ due to chroot) **/ + String serverPath; + + boolean finished; + + AsyncCallback cb; + + Object ctx; + + WatchRegistration watchRegistration; + + public boolean readOnly; + + /** Convenience ctor */ + Packet(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration) { + this(requestHeader, replyHeader, request, response, + watchRegistration, false); + } + + Packet(RequestHeader requestHeader, ReplyHeader replyHeader, + Record request, Record response, + WatchRegistration watchRegistration, boolean readOnly) { + + this.requestHeader = requestHeader; + this.replyHeader = replyHeader; + this.request = request; + this.response = response; + this.readOnly = readOnly; + this.watchRegistration = watchRegistration; + } + + public void createBB() { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeInt(-1, "len"); // We'll fill this in later + if (requestHeader != null) { + requestHeader.serialize(boa, "header"); + } + if (request instanceof ConnectRequest) { + request.serialize(boa, "connect"); + // append "am-I-allowed-to-be-readonly" flag + boa.writeBool(readOnly, "readOnly"); + } else if (request != null) { + request.serialize(boa, "request"); + } + baos.close(); + this.bb = ByteBuffer.wrap(baos.toByteArray()); + this.bb.putInt(this.bb.capacity() - 4); + this.bb.rewind(); + } catch (IOException e) { + LOG.warn("Ignoring unexpected exception", e); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("clientPath:" + clientPath); + sb.append(" serverPath:" + serverPath); + sb.append(" finished:" + finished); + + sb.append(" header:: " + requestHeader); + sb.append(" replyHeader:: " + replyHeader); + sb.append(" request:: " + request); + sb.append(" response:: " + response); + + // jute toString is horrible, remove unnecessary newlines + return sb.toString().replaceAll("\r*\n+", " "); + } + } + + /** + * Creates a connection object. The actual network connect doesn't get + * established until needed. The start() instance method must be called + * subsequent to construction. + * + * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider + * the list of ZooKeeper servers to connect to + * @param sessionTimeout + * the timeout for connections. + * @param zooKeeper + * the zookeeper object that this connection is related to. + * @param watcher watcher for this connection + * @param clientCnxnSocket + * the socket implementation used (e.g. NIO/Netty) + * @param canBeReadOnly + * whether the connection is allowed to go to read-only + * mode in case of partitioning + * @throws IOException + */ + public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) + throws IOException { + this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, + clientCnxnSocket, 0, new byte[16], canBeReadOnly); + } + + /** + * Creates a connection object. The actual network connect doesn't get + * established until needed. The start() instance method must be called + * subsequent to construction. + * + * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 + * @param hostProvider + * the list of ZooKeeper servers to connect to + * @param sessionTimeout + * the timeout for connections. + * @param zooKeeper + * the zookeeper object that this connection is related to. + * @param watcher watcher for this connection + * @param clientCnxnSocket + * the socket implementation used (e.g. NIO/Netty) + * @param sessionId session id if re-establishing session + * @param sessionPasswd session passwd if re-establishing session + * @param canBeReadOnly + * whether the connection is allowed to go to read-only + * mode in case of partitioning + * @throws IOException + */ + public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, + long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { + this.zooKeeper = zooKeeper; + this.watcher = watcher; + this.sessionId = sessionId; + this.sessionPasswd = sessionPasswd; + this.sessionTimeout = sessionTimeout; + this.hostProvider = hostProvider; + this.chrootPath = chrootPath; + + connectTimeout = sessionTimeout / hostProvider.size(); + readTimeout = sessionTimeout * 2 / 3; + readOnly = canBeReadOnly; + + sendThread = new SendThread(clientCnxnSocket); + eventThread = new EventThread(); + + } + + /** + * tests use this to check on reset of watches + * @return if the auto reset of watches are disabled + */ + public static boolean getDisableAutoResetWatch() { + return disableAutoWatchReset; + } + /** + * tests use this to set the auto reset + * @param b the value to set disable watches to + */ + public static void setDisableAutoResetWatch(boolean b) { + disableAutoWatchReset = b; + } + public void start() { + sendThread.start(); + eventThread.start(); + } + + private Object eventOfDeath = new Object(); + + private static class WatcherSetEventPair { + private final Set<Watcher> watchers; + private final WatchedEvent event; + + public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) { + this.watchers = watchers; + this.event = event; + } + } + + /** + * Guard against creating "-EventThread-EventThread-EventThread-..." thread + * names when ZooKeeper object is being created from within a watcher. + * See ZOOKEEPER-795 for details. + */ + private static String makeThreadName(String suffix) { + String name = Thread.currentThread().getName(). + replaceAll("-EventThread", ""); + return name + suffix; + } + + class EventThread extends ZooKeeperThread { + private final LinkedBlockingQueue<Object> waitingEvents = + new LinkedBlockingQueue<Object>(); + + /** This is really the queued session state until the event + * thread actually processes the event and hands it to the watcher. + * But for all intents and purposes this is the state. + */ + private volatile KeeperState sessionState = KeeperState.Disconnected; + + private volatile boolean wasKilled = false; + private volatile boolean isRunning = false; + + EventThread() { + super(makeThreadName("-EventThread")); + setDaemon(true); + } + + public void queueEvent(WatchedEvent event) { + if (event.getType() == EventType.None + && sessionState == event.getState()) { + return; + } + sessionState = event.getState(); + + // materialize the watchers based on the event + WatcherSetEventPair pair = new WatcherSetEventPair( + watcher.materialize(event.getState(), event.getType(), + event.getPath()), + event); + // queue the pair (watch set & event) for later processing + waitingEvents.add(pair); + } + + public void queuePacket(Packet packet) { + if (wasKilled) { + synchronized (waitingEvents) { + if (isRunning) waitingEvents.add(packet); + else processEvent(packet); + } + } else { + waitingEvents.add(packet); + } + } + + public void queueEventOfDeath() { + waitingEvents.add(eventOfDeath); + } + + @Override + public void run() { + try { + isRunning = true; + while (true) { + Object event = waitingEvents.take(); + if (event == eventOfDeath) { + wasKilled = true; + } else { + processEvent(event); + } + if (wasKilled) + synchronized (waitingEvents) { + if (waitingEvents.isEmpty()) { + isRunning = false; + break; + } + } + } + } catch (InterruptedException e) { + LOG.error("Event thread exiting due to interruption", e); + } + + LOG.info("EventThread shut down for session: 0x{}", + Long.toHexString(getSessionId())); + } + + private void processEvent(Object event) { + try { + if (event instanceof WatcherSetEventPair) { + // each watcher will process the event + WatcherSetEventPair pair = (WatcherSetEventPair) event; + for (Watcher watcher : pair.watchers) { + try { + watcher.process(pair.event); + } catch (Throwable t) { + LOG.error("Error while calling watcher ", t); + } + } + } else { + Packet p = (Packet) event; + int rc = 0; + String clientPath = p.clientPath; + if (p.replyHeader.getErr() != 0) { + rc = p.replyHeader.getErr(); + } + if (p.cb == null) { + LOG.warn("Somehow a null cb got to EventThread!"); + } else if (p.response instanceof ExistsResponse + || p.response instanceof SetDataResponse + || p.response instanceof SetACLResponse) { + StatCallback cb = (StatCallback) p.cb; + if (rc == 0) { + if (p.response instanceof ExistsResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((ExistsResponse) p.response) + .getStat()); + } else if (p.response instanceof SetDataResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetDataResponse) p.response) + .getStat()); + } else if (p.response instanceof SetACLResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetACLResponse) p.response) + .getStat()); + } + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetDataResponse) { + DataCallback cb = (DataCallback) p.cb; + GetDataResponse rsp = (GetDataResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getData(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetACLResponse) { + ACLCallback cb = (ACLCallback) p.cb; + GetACLResponse rsp = (GetACLResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getAcl(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetChildrenResponse) { + ChildrenCallback cb = (ChildrenCallback) p.cb; + GetChildrenResponse rsp = (GetChildrenResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren()); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetChildren2Response) { + Children2Callback cb = (Children2Callback) p.cb; + GetChildren2Response rsp = (GetChildren2Response) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, null); + } + } else if (p.response instanceof CreateResponse) { + StringCallback cb = (StringCallback) p.cb; + CreateResponse rsp = (CreateResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, + (chrootPath == null + ? rsp.getPath() + : rsp.getPath() + .substring(chrootPath.length()))); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof MultiResponse) { + MultiCallback cb = (MultiCallback) p.cb; + MultiResponse rsp = (MultiResponse) p.response; + if (rc == 0) { + List<OpResult> results = rsp.getResultList(); + int newRc = rc; + for (OpResult result : results) { + if (result instanceof ErrorResult + && KeeperException.Code.OK.intValue() + != (newRc = ((ErrorResult) result).getErr())) { + break; + } + } + cb.processResult(newRc, clientPath, p.ctx, results); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.cb instanceof VoidCallback) { + VoidCallback cb = (VoidCallback) p.cb; + cb.processResult(rc, clientPath, p.ctx); + } + } + } catch (Throwable t) { + LOG.error("Caught unexpected throwable", t); + } + } + } + + private void finishPacket(Packet p) { + if (p.watchRegistration != null) { + p.watchRegistration.register(p.replyHeader.getErr()); + } + + if (p.cb == null) { + synchronized (p) { + p.finished = true; + p.notifyAll(); + } + } else { + p.finished = true; + eventThread.queuePacket(p); + } + } + + private void conLossPacket(Packet p) { + if (p.replyHeader == null) { + return; + } + switch (state) { + case AUTH_FAILED: + p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); + break; + case CLOSED: + p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); + break; + default: + p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); + } + finishPacket(p); + } + + private volatile long lastZxid; + + public long getLastZxid() { + return lastZxid; + } + + static class EndOfStreamException extends IOException { + private static final long serialVersionUID = -5438877188796231422L; + + public EndOfStreamException(String msg) { + super(msg); + } + + @Override + public String toString() { + return "EndOfStreamException: " + getMessage(); + } + } + + private static class SessionTimeoutException extends IOException { + private static final long serialVersionUID = 824482094072071178L; + + public SessionTimeoutException(String msg) { + super(msg); + } + } + + private static class SessionExpiredException extends IOException { + private static final long serialVersionUID = -1388816932076193249L; + + public SessionExpiredException(String msg) { + super(msg); + } + } + + private static class RWServerFoundException extends IOException { + private static final long serialVersionUID = 90431199887158758L; + + public RWServerFoundException(String msg) { + super(msg); + } + } + + public static final int packetLen = Integer.getInteger("jute.maxbuffer", + 4096 * 1024); + + /** + * This class services the outgoing request queue and generates the heart + * beats. It also spawns the ReadThread. + */ + class SendThread extends ZooKeeperThread { + private long lastPingSentNs; + private final ClientCnxnSocket clientCnxnSocket; + private Random r = new Random(System.nanoTime()); + private boolean isFirstConnect = true; + + void readResponse(ByteBuffer incomingBuffer) throws IOException { + ByteBufferInputStream bbis = new ByteBufferInputStream( + incomingBuffer); + BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); + ReplyHeader replyHdr = new ReplyHeader(); + + replyHdr.deserialize(bbia, "header"); + if (replyHdr.getXid() == -2) { + // -2 is the xid for pings + if (LOG.isDebugEnabled()) { + LOG.debug("Got ping response for sessionid: 0x" + + Long.toHexString(sessionId) + + " after " + + ((System.nanoTime() - lastPingSentNs) / 1000000) + + "ms"); + } + return; + } + if (replyHdr.getXid() == -4) { + // -4 is the xid for AuthPacket + if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { + state = States.AUTH_FAILED; + eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, + Watcher.Event.KeeperState.AuthFailed, null) ); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Got auth sessionid:0x" + + Long.toHexString(sessionId)); + } + return; + } + if (replyHdr.getXid() == -1) { + // -1 means notification + if (LOG.isDebugEnabled()) { + LOG.debug("Got notification sessionid:0x" + + Long.toHexString(sessionId)); + } + WatcherEvent event = new WatcherEvent(); + event.deserialize(bbia, "response"); + + // convert from a server path to a client path + if (chrootPath != null) { + String serverPath = event.getPath(); + if(serverPath.compareTo(chrootPath)==0) + event.setPath("/"); + else if (serverPath.length() > chrootPath.length()) + event.setPath(serverPath.substring(chrootPath.length())); + else { + LOG.warn("Got server path " + event.getPath() + + " which is too short for chroot path " + + chrootPath); + } + } + + WatchedEvent we = new WatchedEvent(event); + if (LOG.isDebugEnabled()) { + LOG.debug("Got " + we + " for sessionid 0x" + + Long.toHexString(sessionId)); + } + + eventThread.queueEvent( we ); + return; + } + + // If SASL authentication is currently in progress, construct and + // send a response packet immediately, rather than queuing a + // response as with other packets. + if (clientTunneledAuthenticationInProgress()) { + GetSASLRequest request = new GetSASLRequest(); + request.deserialize(bbia,"token"); + zooKeeperSaslClient.respondToServer(request.getToken(), + ClientCnxn.this); + return; + } + + Packet packet; + synchronized (pendingQueue) { + if (pendingQueue.size() == 0) { + throw new IOException("Nothing in the queue, but got " + + replyHdr.getXid()); + } + packet = pendingQueue.remove(); + } + /* + * Since requests are processed in order, we better get a response + * to the first request! + */ + try { + if (packet.requestHeader.getXid() != replyHdr.getXid()) { + packet.replyHeader.setErr( + KeeperException.Code.CONNECTIONLOSS.intValue()); + throw new IOException("Xid out of order. Got Xid " + + replyHdr.getXid() + " with err " + + + replyHdr.getErr() + + " expected Xid " + + packet.requestHeader.getXid() + + " for a packet with details: " + + packet ); + } + + packet.replyHeader.setXid(replyHdr.getXid()); + packet.replyHeader.setErr(replyHdr.getErr()); + packet.replyHeader.setZxid(replyHdr.getZxid()); + if (replyHdr.getZxid() > 0) { + lastZxid = replyHdr.getZxid(); + } + if (packet.response != null && replyHdr.getErr() == 0) { + packet.response.deserialize(bbia, "response"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Reading reply sessionid:0x" + + Long.toHexString(sessionId) + ", packet:: " + packet); + } + } finally { + finishPacket(packet); + } + } + + SendThread(ClientCnxnSocket clientCnxnSocket) { + super(makeThreadName("-SendThread()")); + state = States.CONNECTING; + this.clientCnxnSocket = clientCnxnSocket; + setDaemon(true); + } + + // TODO: can not name this method getState since Thread.getState() + // already exists + // It would be cleaner to make class SendThread an implementation of + // Runnable + /** + * Used by ClientCnxnSocket + * + * @return + */ + ZooKeeper.States getZkState() { + return state; + } + + ClientCnxnSocket getClientCnxnSocket() { + return clientCnxnSocket; + } + + void primeConnection() throws IOException { + LOG.info("Socket connection established to " + + clientCnxnSocket.getRemoteSocketAddress() + + ", initiating session"); + isFirstConnect = false; + long sessId = (seenRwServerBefore) ? sessionId : 0; + ConnectRequest conReq = new ConnectRequest(0, lastZxid, + sessionTimeout, sessId, sessionPasswd); + synchronized (outgoingQueue) { + // We add backwards since we are pushing into the front + // Only send if there's a pending watch + // TODO: here we have the only remaining use of zooKeeper in + // this class. It's to be eliminated! + if (!disableAutoWatchReset) { + List<String> dataWatches = zooKeeper.getDataWatches(); + List<String> existWatches = zooKeeper.getExistWatches(); + List<String> childWatches = zooKeeper.getChildWatches(); + if (!dataWatches.isEmpty() + || !existWatches.isEmpty() || !childWatches.isEmpty()) { + + Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator(); + Iterator<String> existWatchesIter = prependChroot(existWatches).iterator(); + Iterator<String> childWatchesIter = prependChroot(childWatches).iterator(); + long setWatchesLastZxid = lastZxid; + + while (dataWatchesIter.hasNext() + || existWatchesIter.hasNext() || childWatchesIter.hasNext()) { + List<String> dataWatchesBatch = new ArrayList<String>(); + List<String> existWatchesBatch = new ArrayList<String>(); + List<String> childWatchesBatch = new ArrayList<String>(); + int batchLength = 0; + + // Note, we may exceed our max length by a bit when we add the last + // watch in the batch. This isn't ideal, but it makes the code simpler. + while (batchLength < SET_WATCHES_MAX_LENGTH) { + final String watch; + if (dataWatchesIter.hasNext()) { + watch = dataWatchesIter.next(); + dataWatchesBatch.add(watch); + } else if (existWatchesIter.hasNext()) { + watch = existWatchesIter.next(); + existWatchesBatch.add(watch); + } else if (childWatchesIter.hasNext()) { + watch = childWatchesIter.next(); + childWatchesBatch.add(watch); + } else { + break; + } + batchLength += watch.length(); + } + + SetWatches sw = new SetWatches(setWatchesLastZxid, + dataWatchesBatch, + existWatchesBatch, + childWatchesBatch); + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.setWatches); + h.setXid(-8); + Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); + outgoingQueue.addFirst(packet); + } + } + } + + for (AuthData id : authInfo) { + outgoingQueue.addFirst(new Packet(new RequestHeader(-4, + OpCode.auth), null, new AuthPacket(0, id.scheme, + id.data), null, null)); + } + outgoingQueue.addFirst(new Packet(null, null, conReq, + null, null, readOnly)); + } + clientCnxnSocket.enableReadWriteOnly(); + if (LOG.isDebugEnabled()) { + LOG.debug("Session establishment request sent on " + + clientCnxnSocket.getRemoteSocketAddress()); + } + } + + private List<String> prependChroot(List<String> paths) { + if (chrootPath != null && !paths.isEmpty()) { + for (int i = 0; i < paths.size(); ++i) { + String clientPath = paths.get(i); + String serverPath; + // handle clientPath = "/" + if (clientPath.length() == 1) { + serverPath = chrootPath; + } else { + serverPath = chrootPath + clientPath; + } + paths.set(i, serverPath); + } + } + return paths; + } + + private void sendPing() { + lastPingSentNs = System.nanoTime(); + RequestHeader h = new RequestHeader(-2, OpCode.ping); + queuePacket(h, null, null, null, null, null, null, null, null); + } + + private InetSocketAddress rwServerAddress = null; + + private final static int minPingRwTimeout = 100; + + private final static int maxPingRwTimeout = 60000; + + private int pingRwTimeout = minPingRwTimeout; + + // Set to true if and only if constructor of ZooKeeperSaslClient + // throws a LoginException: see startConnect() below. + private boolean saslLoginFailed = false; + + private void startConnect(InetSocketAddress addr) throws IOException { + // initializing it for new connection + saslLoginFailed = false; + state = States.CONNECTING; + + setName(getName().replaceAll("\\(.*\\)", + "(" + addr.getHostName() + ":" + addr.getPort() + ")")); + if (ZooKeeperSaslClient.isEnabled()) { + try { + String principalUserName = System.getProperty( + ZK_SASL_CLIENT_USERNAME, "zookeeper"); + zooKeeperSaslClient = + new ZooKeeperSaslClient( + principalUserName+"/"+addr.getHostName()); + } catch (LoginException e) { + // An authentication error occurred when the SASL client tried to initialize: + // for Kerberos this means that the client failed to authenticate with the KDC. + // This is different from an authentication error that occurs during communication + // with the Zookeeper server, which is handled below. + LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + + "SASL authentication, if Zookeeper server allows it."); + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.AuthFailed, null)); + saslLoginFailed = true; + } + } + logStartConnect(addr); + + clientCnxnSocket.connect(addr); + } + + private void logStartConnect(InetSocketAddress addr) { + String msg = "Opening socket connection to server " + addr; + if (zooKeeperSaslClient != null) { + msg += ". " + zooKeeperSaslClient.getConfigStatus(); + } + LOG.info(msg); + } + + private static final String RETRY_CONN_MSG = + ", closing socket connection and attempting reconnect"; + + @Override + public void run() { + clientCnxnSocket.introduce(this,sessionId); + clientCnxnSocket.updateNow(); + clientCnxnSocket.updateLastSendAndHeard(); + int to; + long lastPingRwServer = Time.currentElapsedTime(); + final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds + InetSocketAddress serverAddress = null; + while (state.isAlive()) { + try { + if (!clientCnxnSocket.isConnected()) { + if(!isFirstConnect){ + try { + Thread.sleep(r.nextInt(1000)); + } catch (InterruptedException e) { + LOG.warn("Unexpected exception", e); + } + } + // don't re-establish connection if we are closing + if (closing || !state.isAlive()) { + break; + } + if (rwServerAddress != null) { + serverAddress = rwServerAddress; + rwServerAddress = null; + } else { + serverAddress = hostProvider.next(1000); + } + startConnect(serverAddress); + clientCnxnSocket.updateLastSendAndHeard(); + } + + if (state.isConnected()) { + // determine whether we need to send an AuthFailed event. + if (zooKeeperSaslClient != null) { + boolean sendAuthEvent = false; + if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { + try { + zooKeeperSaslClient.initialize(ClientCnxn.this); + } catch (SaslException e) { + LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); + state = States.AUTH_FAILED; + sendAuthEvent = true; + } + } + KeeperState authState = zooKeeperSaslClient.getKeeperState(); + if (authState != null) { + if (authState == KeeperState.AuthFailed) { + // An authentication error occurred during authentication with the Zookeeper Server. + state = States.AUTH_FAILED; + sendAuthEvent = true; + } else { + if (authState == KeeperState.SaslAuthenticated) { + sendAuthEvent = true; + } + } + } + + if (sendAuthEvent == true) { + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + authState,null)); + } + } + to = readTimeout - clientCnxnSocket.getIdleRecv(); + } else { + to = connectTimeout - clientCnxnSocket.getIdleRecv(); + } + + if (to <= 0) { + String warnInfo; + warnInfo = "Client session timed out, have not heard from server in " + + clientCnxnSocket.getIdleRecv() + + "ms" + + " for sessionid 0x" + + Long.toHexString(sessionId); + LOG.warn(warnInfo); + throw new SessionTimeoutException(warnInfo); + } + if (state.isConnected()) { + //1000(1 second) is to prevent race condition missing to send the second ping + //also make sure not to send too many pings when readTimeout is small + int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - + ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); + //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL + if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { + sendPing(); + clientCnxnSocket.updateLastSend(); + } else { + if (timeToNextPing < to) { + to = timeToNextPing; + } + } + } + + // If we are in read-only mode, seek for read/write server + if (state == States.CONNECTEDREADONLY) { + long now = Time.currentElapsedTime(); + int idlePingRwServer = (int) (now - lastPingRwServer); + if (idlePingRwServer >= pingRwTimeout) { + lastPingRwServer = now; + idlePingRwServer = 0; + pingRwTimeout = + Math.min(2*pingRwTimeout, maxPingRwTimeout); + pingRwServer(); + } + to = Math.min(to, pingRwTimeout - idlePingRwServer); + } + + clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); + } catch (Throwable e) { + if (closing) { + if (LOG.isDebugEnabled()) { + // closing so this is expected + LOG.debug("An exception was thrown while closing send thread for session 0x" + + Long.toHexString(getSessionId()) + + " : " + e.getMessage()); + } + break; + } else { + // this is ugly, you have a better way speak up + if (e instanceof SessionExpiredException) { + LOG.info(e.getMessage() + ", closing socket connection"); + } else if (e instanceof SessionTimeoutException) { + LOG.info(e.getMessage() + RETRY_CONN_MSG); + } else if (e instanceof EndOfStreamException) { + LOG.info(e.getMessage() + RETRY_CONN_MSG); + } else if (e instanceof RWServerFoundException) { + LOG.info(e.getMessage()); + } else if (e instanceof SocketException) { + LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); + } else { + LOG.warn("Session 0x{} for server {}, unexpected error{}", + Long.toHexString(getSessionId()), + serverAddress, + RETRY_CONN_MSG, + e); + } + cleanup(); + if (state.isAlive()) { + eventThread.queueEvent(new WatchedEvent( + Event.EventType.None, + Event.KeeperState.Disconnected, + null)); + } + clientCnxnSocket.updateNow(); + clientCnxnSocket.updateLastSendAndHeard(); + } + } + } + cleanup(); + clientCnxnSocket.close(); + if (state.isAlive()) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, + Event.KeeperState.Disconnected, null)); + } + ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), + "SendThread exited loop for session: 0x" + + Long.toHexString(getSessionId())); + } + + private void pingRwServer() throws RWServerFoundException, UnknownHostException { + String result = null; + InetSocketAddress addr = hostProvider.next(0); + LOG.info("Checking server " + addr + " for being r/w." + + " Timeout " + pingRwTimeout); + + Socket sock = null; + BufferedReader br = null; + try { + sock = new Socket(addr.getHostName(), addr.getPort()); + sock.setSoLinger(false, -1); + sock.setSoTimeout(1000); + sock.setTcpNoDelay(true); + sock.getOutputStream().write("isro".getBytes()); + sock.getOutputStream().flush(); + sock.shutdownOutput(); + br = new BufferedReader( + new InputStreamReader(sock.getInputStream())); + result = br.readLine(); + } catch (ConnectException e) { + // ignore, this just means server is not up + } catch (IOException e) { + // some unexpected error, warn about it + LOG.warn("Exception while seeking for r/w server " + + e.getMessage(), e); + } finally { + if (sock != null) { + try { + sock.close(); + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } + } + if (br != null) { + try { + br.close(); + } catch (IOException e) { + LOG.warn("Unexpected exception", e); + } + } + } + + if ("rw".equals(result)) { + pingRwTimeout = minPingRwTimeout; + // save the found address so that it's used during the next + // connection attempt + rwServerAddress = addr; + throw new RWServerFoundException("Majority server found at " + + addr.getHostName() + ":" + addr.getPort()); + } + } + + private void cleanup() { + clientCnxnSocket.cleanup(); + synchronized (pendingQueue) { + for (Packet p : pendingQueue) { + conLossPacket(p); + } + pendingQueue.clear(); + } + synchronized (outgoingQueue) { + for (Packet p : outgoingQueue) { + conLossPacket(p); + } + outgoingQueue.clear(); + } + } + + /** + * Callback invoked by the ClientCnxnSocket once a connection has been + * established. + * + * @param _negotiatedSessionTimeout + * @param _sessionId + * @param _sessionPasswd + * @param isRO + * @throws IOException + */ + void onConnected(int _negotiatedSessionTimeout, long _sessionId, + byte[] _sessionPasswd, boolean isRO) throws IOException { + negotiatedSessionTimeout = _negotiatedSessionTimeout; + if (negotiatedSessionTimeout <= 0) { + state = States.CLOSED; + + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + Watcher.Event.KeeperState.Expired, null)); + eventThread.queueEventOfDeath(); + + String warnInfo; + warnInfo = "Unable to reconnect to ZooKeeper service, session 0x" + + Long.toHexString(sessionId) + " has expired"; + LOG.warn(warnInfo); + throw new SessionExpiredException(warnInfo); + } + if (!readOnly && isRO) { + LOG.error("Read/write client got connected to read-only server"); + } + readTimeout = negotiatedSessionTimeout * 2 / 3; + connectTimeout = negotiatedSessionTimeout / hostProvider.size(); + hostProvider.onConnected(); + sessionId = _sessionId; + sessionPasswd = _sessionPasswd; + state = (isRO) ? + States.CONNECTEDREADONLY : States.CONNECTED; + seenRwServerBefore |= !isRO; + LOG.info("Session establishment complete on server " + + clientCnxnSocket.getRemoteSocketAddress() + + ", sessionid = 0x" + Long.toHexString(sessionId) + + ", negotiated timeout = " + negotiatedSessionTimeout + + (isRO ? " (READ-ONLY mode)" : "")); + KeeperState eventState = (isRO) ? + KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; + eventThread.queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, + eventState, null)); + } + + void close() { + state = States.CLOSED; + clientCnxnSocket.wakeupCnxn(); + } + + void testableCloseSocket() throws IOException { + clientCnxnSocket.testableCloseSocket(); + } + + public boolean clientTunneledAuthenticationInProgress() { + // 1. SASL client is disabled. + if (!ZooKeeperSaslClient.isEnabled()) { + return false; + } + + // 2. SASL login failed. + if (saslLoginFailed == true) { + return false; + } + + // 3. SendThread has not created the authenticating object yet, + // therefore authentication is (at the earliest stage of being) in progress. + if (zooKeeperSaslClient == null) { + return true; + } + + // 4. authenticating object exists, so ask it for its progress. + return zooKeeperSaslClient.clientTunneledAuthenticationInProgress(); + } + + public void sendPacket(Packet p) throws IOException { + clientCnxnSocket.sendPacket(p); + } + } + + /** + * Shutdown the send/event threads. This method should not be called + * directly - rather it should be called as part of close operation. This + * method is primarily here to allow the tests to verify disconnection + * behavior. + */ + public void disconnect() { + if (LOG.isDebugEnabled()) { + LOG.debug("Disconnecting client for session: 0x" + + Long.toHexString(getSessionId())); + } + + sendThread.close(); + eventThread.queueEventOfDeath(); + } + + /** + * Close the connection, which includes; send session disconnect to the + * server, shutdown the send/event threads. + * + * @throws IOException + */ + public void close() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing client for session: 0x" + + Long.toHexString(getSessionId())); + } + + try { + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.closeSession); + + submitRequest(h, null, null, null); + } catch (InterruptedException e) { + // ignore, close the send/event threads + } finally { + disconnect(); + } + } + + private int xid = 1; + + // @VisibleForTesting + volatile States state = States.NOT_CONNECTED; + + /* + * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to + * the server. Thus, getXid() must be public. + */ + synchronized public int getXid() { + return xid++; + } + + public ReplyHeader submitRequest(RequestHeader h, Record request, + Record response, WatchRegistration watchRegistration) + throws InterruptedException { + ReplyHeader r = new ReplyHeader(); + Packet packet = queuePacket(h, r, request, response, null, null, null, + null, watchRegistration); + synchronized (packet) { + while (!packet.finished) { + packet.wait(); + } + } + return r; + } + + public void enableWrite() { + sendThread.getClientCnxnSocket().enableWrite(); + } + + public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) + throws IOException { + // Generate Xid now because it will be sent immediately, + // by call to sendThread.sendPacket() below. + int xid = getXid(); + RequestHeader h = new RequestHeader(); + h.setXid(xid); + h.setType(opCode); + + ReplyHeader r = new ReplyHeader(); + r.setXid(xid); + + Packet p = new Packet(h, r, request, response, null, false); + p.cb = cb; + sendThread.sendPacket(p); + } + + Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, + Record response, AsyncCallback cb, String clientPath, + String serverPath, Object ctx, WatchRegistration watchRegistration) + { + Packet packet = null; + + // Note that we do not generate the Xid for the packet yet. It is + // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), + // where the packet is actually sent. + synchronized (outgoingQueue) { + packet = new Packet(h, r, request, response, watchRegistration); + packet.cb = cb; + packet.ctx = ctx; + packet.clientPath = clientPath; + packet.serverPath = serverPath; + if (!state.isAlive() || closing) { + conLossPacket(packet); + } else { + // If the client is asking to close the session then + // mark as closing + if (h.getType() == OpCode.closeSession) { + closing = true; + } + outgoingQueue.add(packet); + } + } + sendThread.getClientCnxnSocket().wakeupCnxn(); + return packet; + } + + public void addAuthInfo(String scheme, byte auth[]) { + if (!state.isAlive()) { + return; + } + authInfo.add(new AuthData(scheme, auth)); + queuePacket(new RequestHeader(-4, OpCode.auth), null, + new AuthPacket(0, scheme, auth), null, null, null, null, + null, null); + } + + States getState() { + return state; + } +}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java new file mode 100644 index 0000000..b676531 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +import org.apache.jute.BinaryInputArchive; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.proto.ConnectResponse; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A ClientCnxnSocket does the lower level communication with a socket + * implementation. + * + * This code has been moved out of ClientCnxn so that a Netty implementation can + * be provided as an alternative to the NIO socket code. + * + */ +abstract class ClientCnxnSocket { + private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class); + + protected boolean initialized; + + /** + * This buffer is only used to read the length of the incoming message. + */ + protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4); + + /** + * After the length is read, a new incomingBuffer is allocated in + * readLength() to receive the full message. + */ + protected ByteBuffer incomingBuffer = lenBuffer; + protected long sentCount = 0; + protected long recvCount = 0; + protected long lastHeard; + protected long lastSend; + protected long now; + protected ClientCnxn.SendThread sendThread; + + /** + * The sessionId is only available here for Log and Exception messages. + * Otherwise the socket doesn't need to know it. + */ + protected long sessionId; + + void introduce(ClientCnxn.SendThread sendThread, long sessionId) { + this.sendThread = sendThread; + this.sessionId = sessionId; + } + + void updateNow() { + now = Time.currentElapsedTime(); + } + + int getIdleRecv() { + return (int) (now - lastHeard); + } + + int getIdleSend() { + return (int) (now - lastSend); + } + + long getSentCount() { + return sentCount; + } + + long getRecvCount() { + return recvCount; + } + + void updateLastHeard() { + this.lastHeard = now; + } + + void updateLastSend() { + this.lastSend = now; + } + + void updateLastSendAndHeard() { + this.lastSend = now; + this.lastHeard = now; + } + + protected void readLength() throws IOException { + int len = incomingBuffer.getInt(); + if (len < 0 || len >= ClientCnxn.packetLen) { + throw new IOException("Packet len" + len + " is out of range!"); + } + incomingBuffer = ByteBuffer.allocate(len); + } + + void readConnectResult() throws IOException { + if (LOG.isTraceEnabled()) { + StringBuilder buf = new StringBuilder("0x["); + for (byte b : incomingBuffer.array()) { + buf.append(Integer.toHexString(b) + ","); + } + buf.append("]"); + LOG.trace("readConnectResult " + incomingBuffer.remaining() + " " + + buf.toString()); + } + ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); + BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); + ConnectResponse conRsp = new ConnectResponse(); + conRsp.deserialize(bbia, "connect"); + + // read "is read-only" flag + boolean isRO = false; + try { + isRO = bbia.readBool("readOnly"); + } catch (IOException e) { + // this is ok -- just a packet from an old server which + // doesn't contain readOnly field + LOG.warn("Connected to an old server; r-o mode will be unavailable"); + } + + this.sessionId = conRsp.getSessionId(); + sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, + conRsp.getPasswd(), isRO); + } + + abstract boolean isConnected(); + + abstract void connect(InetSocketAddress addr) throws IOException; + + abstract SocketAddress getRemoteSocketAddress(); + + abstract SocketAddress getLocalSocketAddress(); + + abstract void cleanup(); + + abstract void close(); + + abstract void wakeupCnxn(); + + abstract void enableWrite(); + + abstract void disableWrite(); + + abstract void enableReadWriteOnly(); + + abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue, + LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) + throws IOException, InterruptedException; + + abstract void testableCloseSocket() throws IOException; + + abstract void sendPacket(Packet p) throws IOException; +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java new file mode 100644 index 0000000..720619d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java @@ -0,0 +1,431 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +import org.apache.zookeeper.ClientCnxn.EndOfStreamException; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientCnxnSocketNIO extends ClientCnxnSocket { + private static final Logger LOG = LoggerFactory + .getLogger(ClientCnxnSocketNIO.class); + + private final Selector selector = Selector.open(); + + private SelectionKey sockKey; + + ClientCnxnSocketNIO() throws IOException { + super(); + } + + @Override + boolean isConnected() { + return sockKey != null; + } + + /** + * @return true if a packet was received + * @throws InterruptedException + * @throws IOException + */ + void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) + throws InterruptedException, IOException { + SocketChannel sock = (SocketChannel) sockKey.channel(); + if (sock == null) { + throw new IOException("Socket is null!"); + } + if (sockKey.isReadable()) { + int rc = sock.read(incomingBuffer); + if (rc < 0) { + throw new EndOfStreamException( + "Unable to read additional data from server sessionid 0x" + + Long.toHexString(sessionId) + + ", likely server has closed socket"); + } + if (!incomingBuffer.hasRemaining()) { + incomingBuffer.flip(); + if (incomingBuffer == lenBuffer) { + recvCount++; + readLength(); + } else if (!initialized) { + readConnectResult(); + enableRead(); + if (findSendablePacket(outgoingQueue, + cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { + // Since SASL authentication has completed (if client is configured to do so), + // outgoing packets waiting in the outgoingQueue can now be sent. + enableWrite(); + } + lenBuffer.clear(); + incomingBuffer = lenBuffer; + updateLastHeard(); + initialized = true; + } else { + sendThread.readResponse(incomingBuffer); + lenBuffer.clear(); + incomingBuffer = lenBuffer; + updateLastHeard(); + } + } + } + if (sockKey.isWritable()) { + synchronized(outgoingQueue) { + Packet p = findSendablePacket(outgoingQueue, + cnxn.sendThread.clientTunneledAuthenticationInProgress()); + + if (p != null) { + updateLastSend(); + // If we already started writing p, p.bb will already exist + if (p.bb == null) { + if ((p.requestHeader != null) && + (p.requestHeader.getType() != OpCode.ping) && + (p.requestHeader.getType() != OpCode.auth)) { + p.requestHeader.setXid(cnxn.getXid()); + } + p.createBB(); + } + sock.write(p.bb); + if (!p.bb.hasRemaining()) { + sentCount++; + outgoingQueue.removeFirstOccurrence(p); + if (p.requestHeader != null + && p.requestHeader.getType() != OpCode.ping + && p.requestHeader.getType() != OpCode.auth) { + synchronized (pendingQueue) { + pendingQueue.add(p); + } + } + } + } + if (outgoingQueue.isEmpty()) { + // No more packets to send: turn off write interest flag. + // Will be turned on later by a later call to enableWrite(), + // from within ZooKeeperSaslClient (if client is configured + // to attempt SASL authentication), or in either doIO() or + // in doTransport() if not. + disableWrite(); + } else if (!initialized && p != null && !p.bb.hasRemaining()) { + // On initial connection, write the complete connect request + // packet, but then disable further writes until after + // receiving a successful connection response. If the + // session is expired, then the server sends the expiration + // response and immediately closes its end of the socket. If + // the client is simultaneously writing on its end, then the + // TCP stack may choose to abort with RST, in which case the + // client would never receive the session expired event. See + // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html + disableWrite(); + } else { + // Just in case + enableWrite(); + } + } + } + } + + private Packet findSendablePacket(LinkedList<Packet> outgoingQueue, + boolean clientTunneledAuthenticationInProgress) { + synchronized (outgoingQueue) { + if (outgoingQueue.isEmpty()) { + return null; + } + if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish + || !clientTunneledAuthenticationInProgress) { + return outgoingQueue.getFirst(); + } + + // Since client's authentication with server is in progress, + // send only the null-header packet queued by primeConnection(). + // This packet must be sent so that the SASL authentication process + // can proceed, but all other packets should wait until + // SASL authentication completes. + ListIterator<Packet> iter = outgoingQueue.listIterator(); + while (iter.hasNext()) { + Packet p = iter.next(); + if (p.requestHeader == null) { + // We've found the priming-packet. Move it to the beginning of the queue. + iter.remove(); + outgoingQueue.add(0, p); + return p; + } else { + // Non-priming packet: defer it until later, leaving it in the queue + // until authentication completes. + if (LOG.isDebugEnabled()) { + LOG.debug("deferring non-priming packet: " + p + + "until SASL authentication completes."); + } + } + } + // no sendable packet found. + return null; + } + } + + @Override + void cleanup() { + if (sockKey != null) { + SocketChannel sock = (SocketChannel) sockKey.channel(); + sockKey.cancel(); + try { + sock.socket().shutdownInput(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring exception during shutdown input", e); + } + } + try { + sock.socket().shutdownOutput(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring exception during shutdown output", + e); + } + } + try { + sock.socket().close(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring exception during socket close", e); + } + } + try { + sock.close(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring exception during channel close", e); + } + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("SendThread interrupted during sleep, ignoring"); + } + } + sockKey = null; + } + + @Override + void close() { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Doing client selector close"); + } + selector.close(); + if (LOG.isTraceEnabled()) { + LOG.trace("Closed client selector"); + } + } catch (IOException e) { + LOG.warn("Ignoring exception during selector close", e); + } + } + + /** + * create a socket channel. + * @return the created socket channel + * @throws IOException + */ + SocketChannel createSock() throws IOException { + SocketChannel sock; + sock = SocketChannel.open(); + sock.configureBlocking(false); + sock.socket().setSoLinger(false, -1); + sock.socket().setTcpNoDelay(true); + return sock; + } + + /** + * register with the selection and connect + * @param sock the {@link SocketChannel} + * @param addr the address of remote host + * @throws IOException + */ + void registerAndConnect(SocketChannel sock, InetSocketAddress addr) + throws IOException { + sockKey = sock.register(selector, SelectionKey.OP_CONNECT); + boolean immediateConnect = sock.connect(addr); + if (immediateConnect) { + sendThread.primeConnection(); + } + } + + @Override + void connect(InetSocketAddress addr) throws IOException { + SocketChannel sock = createSock(); + try { + registerAndConnect(sock, addr); + } catch (IOException e) { + LOG.error("Unable to open socket to " + addr); + sock.close(); + throw e; + } + initialized = false; + + /* + * Reset incomingBuffer + */ + lenBuffer.clear(); + incomingBuffer = lenBuffer; + } + + /** + * Returns the address to which the socket is connected. + * + * @return ip address of the remote side of the connection or null if not + * connected + */ + @Override + SocketAddress getRemoteSocketAddress() { + // a lot could go wrong here, so rather than put in a bunch of code + // to check for nulls all down the chain let's do it the simple + // yet bulletproof way + try { + return ((SocketChannel) sockKey.channel()).socket() + .getRemoteSocketAddress(); + } catch (NullPointerException e) { + return null; + } + } + + /** + * Returns the local address to which the socket is bound. + * + * @return ip address of the remote side of the connection or null if not + * connected + */ + @Override + SocketAddress getLocalSocketAddress() { + // a lot could go wrong here, so rather than put in a bunch of code + // to check for nulls all down the chain let's do it the simple + // yet bulletproof way + try { + return ((SocketChannel) sockKey.channel()).socket() + .getLocalSocketAddress(); + } catch (NullPointerException e) { + return null; + } + } + + @Override + synchronized void wakeupCnxn() { + selector.wakeup(); + } + + @Override + void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, + ClientCnxn cnxn) + throws IOException, InterruptedException { + selector.select(waitTimeOut); + Set<SelectionKey> selected; + synchronized (this) { + selected = selector.selectedKeys(); + } + // Everything below and until we get back to the select is + // non blocking, so time is effectively a constant. That is + // Why we just have to do this once, here + updateNow(); + for (SelectionKey k : selected) { + SocketChannel sc = ((SocketChannel) k.channel()); + if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { + if (sc.finishConnect()) { + updateLastSendAndHeard(); + sendThread.primeConnection(); + } + } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { + doIO(pendingQueue, outgoingQueue, cnxn); + } + } + if (sendThread.getZkState().isConnected()) { + synchronized(outgoingQueue) { + if (findSendablePacket(outgoingQueue, + cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { + enableWrite(); + } + } + } + selected.clear(); + } + + //TODO should this be synchronized? + @Override + void testableCloseSocket() throws IOException { + LOG.info("testableCloseSocket() called"); + ((SocketChannel) sockKey.channel()).socket().close(); + } + + @Override + synchronized void enableWrite() { + int i = sockKey.interestOps(); + if ((i & SelectionKey.OP_WRITE) == 0) { + sockKey.interestOps(i | SelectionKey.OP_WRITE); + } + } + + @Override + public synchronized void disableWrite() { + int i = sockKey.interestOps(); + if ((i & SelectionKey.OP_WRITE) != 0) { + sockKey.interestOps(i & (~SelectionKey.OP_WRITE)); + } + } + + synchronized private void enableRead() { + int i = sockKey.interestOps(); + if ((i & SelectionKey.OP_READ) == 0) { + sockKey.interestOps(i | SelectionKey.OP_READ); + } + } + + @Override + synchronized void enableReadWriteOnly() { + sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + } + + Selector getSelector() { + return selector; + } + + @Override + void sendPacket(Packet p) throws IOException { + SocketChannel sock = (SocketChannel) sockKey.channel(); + if (sock == null) { + throw new IOException("Socket is null!"); + } + p.createBB(); + ByteBuffer pbb = p.bb; + sock.write(pbb); + } + + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java new file mode 100644 index 0000000..d56374d --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientWatchManager.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.util.Set; + +/** + */ +public interface ClientWatchManager { + /** + * Return a set of watchers that should be notified of the event. The + * manager must not notify the watcher(s), however it will update it's + * internal structure as if the watches had triggered. The intent being + * that the callee is now responsible for notifying the watchers of the + * event, possibly at some later time. + * + * @param state event state + * @param type event type + * @param path event path + * @return may be empty set but must not be null + */ + public Set<Watcher> materialize(Watcher.Event.KeeperState state, + Watcher.Event.EventType type, String path); +}