http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
 
b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
deleted file mode 100644
index 85e1bc0..0000000
--- 
a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.DistributedLockService;
-import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ResourceEvent;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
-import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
-import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
-import 
com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender;
-import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
-import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-/**
- * @since GemFire 7.0
- *
- */
-public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
-
-  private static final Logger logger = LogService.getLogger();
-  
-  final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup(
-      "Remote Site Discovery Logger Group", logger);
-
-  public SerialGatewaySenderImpl(){
-    super();
-    this.isParallel = false;
-  }
-  public SerialGatewaySenderImpl(Cache cache,
-      GatewaySenderAttributes attrs) {
-    super(cache, attrs);
-  }
-  
-  @Override
-  public void start() {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Starting gatewaySender : {}", this);
-    }
-    
-    this.getLifeCycleLock().writeLock().lock();
-    try {
-      if (isRunning()) {
-        
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
 this.getId()));
-        return;
-      }
-      if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) {
-        String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem()
-            .getConfig().getLocators();
-        if (locators.length() == 0) {
-          throw new GatewaySenderConfigurationException(
-              
LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
-                  .toLocalizedString());
-        }
-      }
-      getSenderAdvisor().initDLockService();
-      if (!isPrimary()) {
-        if (getSenderAdvisor().volunteerForPrimary()) {
-          getSenderAdvisor().makePrimary();
-        } else {
-          getSenderAdvisor().makeSecondary();
-        }
-      }
-      if (getDispatcherThreads() > 1) {
-        eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor(
-            SerialGatewaySenderImpl.this);
-      } else {
-        eventProcessor = new RemoteSerialGatewaySenderEventProcessor(
-            SerialGatewaySenderImpl.this, getId());
-      }
-      eventProcessor.start();
-      waitForRunningStatus();
-      this.startTime = System.currentTimeMillis();
-      
-      //Only notify the type registry if this is a WAN gateway queue
-      if(!isAsyncEventQueue()) {
-        ((GemFireCacheImpl) 
getCache()).getPdxRegistry().gatewaySenderStarted(this);
-      }
-      new UpdateAttributesProcessor(this).distribute(false);
-
-      
-      InternalDistributedSystem system = (InternalDistributedSystem) this.cache
-          .getDistributedSystem();
-      system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this);
-      
-      
logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0,
 this));
-  
-      enqueueTempEvents();
-    } finally {
-      this.getLifeCycleLock().writeLock().unlock();
-    }
-  }
-  
-  @Override
-  public void stop() {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Stopping Gateway Sender : {}", this);
-    }
-    this.getLifeCycleLock().writeLock().lock();
-    try {
-      // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
-      
-      // Stop the proxy (after the dispatcher, so the socket is still
-      // alive until after the dispatcher has stopped)
-      stompProxyDead();
-
-      // Close the listeners
-      for (AsyncEventListener listener : this.listeners) {
-        listener.close();
-      }
-      
logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, 
this));
-
-      clearTempEventsAfterSenderStopped();
-    } finally {
-      this.getLifeCycleLock().writeLock().unlock();
-    }
-   
-    if (this.isPrimary()) {
-      try {
-        DistributedLockService
-            .destroy(getSenderAdvisor().getDLockServiceName());
-      } catch (IllegalArgumentException e) {
-        // service not found... ignore
-      }
-    }
-    Set<RegionQueue> queues = getQueues();
-    if (queues != null && !queues.isEmpty()) {
-      for (RegionQueue q : queues) {
-        ((SerialGatewaySenderQueue)q).cleanUp();
-      }
-    }
-   
-    this.setIsPrimary(false);
-    new UpdateAttributesProcessor(this).distribute(false);
-    Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread();
-    if (lockObtainingThread != null && lockObtainingThread.isAlive()) {
-      // wait a while for thread to terminate
-      try {
-        lockObtainingThread.join(3000);
-      } catch (InterruptedException ex) {
-        // we allowed our join to be canceled
-        // reset interrupt bit so this thread knows it has been interrupted
-        Thread.currentThread().interrupt();
-      }
-      if (lockObtainingThread.isAlive()) {
-        
logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP));
-      }
-    }
-    
-    InternalDistributedSystem system = (InternalDistributedSystem) this.cache
-        .getDistributedSystem();
-    system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
-    
-    this.eventProcessor = null;
-  }
-  
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("SerialGatewaySender{");
-    sb.append("id=" + getId());
-    sb.append(",remoteDsId="+ getRemoteDSId());
-    sb.append(",isRunning ="+ isRunning());
-    sb.append(",isPrimary ="+ isPrimary());
-    sb.append("}");
-    return sb.toString();
-  }
- 
-  @Override
-  public void fillInProfile(Profile profile) {
-    assert profile instanceof GatewaySenderProfile;
-    GatewaySenderProfile pf = (GatewaySenderProfile)profile;
-    pf.Id = getId();
-    pf.startTime = getStartTime();
-    pf.remoteDSId = getRemoteDSId();
-    pf.isRunning = isRunning();
-    pf.isPrimary = isPrimary();
-    pf.isParallel = false;
-    pf.isBatchConflationEnabled = isBatchConflationEnabled();
-    pf.isPersistenceEnabled = isPersistenceEnabled();
-    pf.alertThreshold = getAlertThreshold();
-    pf.manualStart = isManualStart();
-    for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : 
getGatewayEventFilters()) {
-      pf.eventFiltersClassNames.add(filter.getClass().getName());
-    }
-    for (GatewayTransportFilter filter : getGatewayTransportFilters()) {
-      pf.transFiltersClassNames.add(filter.getClass().getName());
-    }
-    for (AsyncEventListener listener : getAsyncEventListeners()) {
-      pf.senderEventListenerClassNames.add(listener.getClass().getName());
-    }
-    pf.isDiskSynchronous = isDiskSynchronous();
-    pf.dispatcherThreads = getDispatcherThreads();
-    pf.orderPolicy = getOrderPolicy();
-    pf.serverLocation = this.getServerLocation(); 
-  }
-
-  /* (non-Javadoc)
-   * @see 
com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl)
-   */
-  @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
-    EventID originalEventId = clonedEvent.getEventId();
-    long originalThreadId = originalEventId.getThreadID();
-    long newThreadId = originalThreadId;
-    if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) {
-      // This thread id has already been converted. Do nothing.
-    } else {
-      newThreadId = ThreadIdentifier
-        .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId,
-            getEventIdIndex());
-    }
-    EventID newEventId = new EventID(originalEventId.getMembershipID(),
-        newThreadId, originalEventId.getSequenceID());
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}: Generated event id for event with key={}, original 
event id={}, originalThreadId={}, new event id={}, newThreadId={}",
-          this, clonedEvent.getKey(), originalEventId, originalThreadId, 
newEventId, newThreadId);
-    }
-    clonedEvent.setEventId(newEventId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
new file mode 100755
index 0000000..b042da0
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+@SuppressWarnings("unchecked")
+public class GatewaySenderBatchOp {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Send a list of gateway events to a server to execute
+   * using connections from the given pool
+   * to communicate with the server.
+   * @param con the connection to send the message on.
+   * @param pool the pool to use to communicate with the server.
+   * @param events list of gateway events
+   * @param batchId the ID of this batch
+   */
+  public static void executeOn(Connection con, ExecutablePool pool, List 
events, int batchId, boolean isRetry)
+  {
+    AbstractOp op = null;
+    //System.out.println("Version: "+con.getWanSiteVersion());
+    //Is this check even needed anymore?  It looks like we just create the 
same exact op impl with the same parameters...
+    if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
con.getDistributedSystemId(), isRetry);
+    } else {
+      // Default should create a batch of server version (ACCEPTOR.VERSION)
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
con.getDistributedSystemId(), isRetry);
+    }
+    pool.executeOn(con, op, true/*timeoutFatal*/);
+  }
+  
+  
+  public static Object executeOn(Connection con, ExecutablePool pool)
+  {
+    AbstractOp op = new GatewaySenderGFEBatchOpImpl();
+    return pool.executeOn(con, op, true/*timeoutFatal*/);
+  }
+                                                               
+  private GatewaySenderBatchOp() {
+    // no instances allowed
+  }
+  
+  static class GatewaySenderGFEBatchOpImpl extends AbstractOp {
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization 
fails
+     */
+    public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, 
boolean isRetry)  {
+      super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
+      boolean removeFromQueueOnException = true;
+      if (isRetry) {
+        getMessage().setIsRetry();
+      }
+      getMessage().addIntPart(events.size());
+      getMessage().addIntPart(batchId);
+      getMessage().addIntPart(dsId);
+      getMessage().addBytesPart(
+          new byte[] { removeFromQueueOnException ? (byte)1 : (byte)0 });
+      // Add each event
+      for (Iterator i = events.iterator(); i.hasNext();) {
+        GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next();
+        // Add action
+        int action = event.getAction();
+        getMessage().addIntPart(action);
+        { // Add posDup flag
+          byte posDupByte = (byte)(event.getPossibleDuplicate()?0x01:0x00);
+          getMessage().addBytesPart(new byte[] {posDupByte});
+        }
+        if (action >= 0 && action <= 3) {
+          // 0 = create
+          // 1 = update
+          // 2 = destroy
+          String regionName = event.getRegionPath();
+          EventID eventId = event.getEventId();
+          Object key = event.getKey();
+          Object callbackArg = event.getSenderCallbackArgument();
+
+          // Add region name
+          getMessage().addStringPart(regionName);
+          // Add event id
+          getMessage().addObjPart(eventId);
+          // Add key
+          getMessage().addStringOrObjPart(key);
+          if (action < 2 /* it is 0 or 1 */) {
+            byte[] value = event.getSerializedValue();
+            byte valueIsObject = event.getValueIsObject();;
+            // Add value (which is already a serialized byte[])
+            getMessage().addRawPart(value, (valueIsObject == 0x01));
+          }
+          // Add callback arg if necessary
+          if (callbackArg == null) {
+            getMessage().addBytesPart(new byte[] {0x00});
+          } else {
+            getMessage().addBytesPart(new byte[] {0x01});
+            getMessage().addObjPart(callbackArg);
+          }
+          getMessage().addLongPart(event.getVersionTimeStamp());
+        }
+      }
+    }
+
+    public GatewaySenderGFEBatchOpImpl() {
+      super(MessageType.GATEWAY_RECEIVER_COMMAND, 0);
+    }
+
+    @Override
+    public Object attempt(Connection cnx) throws Exception {
+      if (getMessage().getNumberOfParts() == 0) {
+        return attemptRead(cnx);
+      }
+      this.failed = true;
+      this.timedOut = false;
+      long start = startAttempt(cnx.getStats());
+      try {
+        try {
+          attemptSend(cnx);
+          this.failed = false;
+        } finally {
+          endSendAttempt(cnx.getStats(), start);
+        }
+      } finally {
+        endAttempt(cnx.getStats(), start);
+      }
+      return this.failed;
+    }
+    
+    private Object attemptRead(Connection cnx) throws Exception {
+      this.failed = true;
+      try {
+        Object result = attemptReadResponse(cnx);
+        this.failed = false;
+        return result;
+      } catch (SocketTimeoutException ste) {
+        this.failed = false;
+        this.timedOut = true;
+        throw ste;
+      } catch (Exception e) {
+        throw e;
+      }
+    }
+    
+    
+    /**
+     * Attempts to read a response to this operation by reading it from the
+     * given connection, and returning it.
+     * @param cnx the connection to read the response from
+     * @return the result of the operation
+     *         or <code>null</code> if the operation has no result.
+     * @throws Exception if the execute failed
+     */
+    protected Object attemptReadResponse(Connection cnx) throws Exception {
+      Message msg = createResponseMessage();
+      if (msg != null) {
+        msg.setComms(cnx.getSocket(), cnx.getInputStream(),
+            cnx.getOutputStream(),
+            ((ConnectionImpl)cnx).getCommBufferForAsyncRead(), cnx.getStats());
+        if (msg instanceof ChunkedMessage) {
+          try {
+            return processResponse(msg, cnx);
+          } finally {
+            msg.unsetComms();
+            // TODO (ashetkar) Handle the case when we fail to read the
+            // connection id.
+            processSecureBytes(cnx, msg);
+          }
+        }
+
+        try {
+          msg.recv();
+        } finally {
+          msg.unsetComms();
+          processSecureBytes(cnx, msg);
+        }
+        return processResponse(msg, cnx);
+      }
+      
+      return null;
+    }
+    
+    
+    private static int calcPartCount(List events) {
+      int numberOfParts = 4; // for the number of events and the batchId
+      for (Iterator i = events.iterator(); i.hasNext();) {
+        GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next();
+        numberOfParts += event.getNumberOfParts();
+      }
+      return numberOfParts;
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().clearMessageHasSecurePartFlag();
+      getMessage().send(false);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      GatewayAck ack = null;
+      try {
+        // Read the header which describes the type of message following
+        switch (msg.getMessageType()) {
+        case MessageType.REPLY:
+          // Read the chunk
+          Part part0 = msg.getPart(0);
+          if (part0.isBytes() && part0.getLength() == 1 && 
part0.getSerializedForm()[0] == 0) {
+            // REPLY_OKAY from a CloseConnection
+            break;
+          }
+          int batchId = part0.getInt();
+          int numEvents = msg.getPart(1).getInt();
+          ack = new GatewayAck(batchId, numEvents);
+          break;
+        case MessageType.EXCEPTION:
+          part0 = msg.getPart(0);
+
+          Object obj = part0.getObject();
+          if (obj instanceof List) {
+            List<BatchException70> l = 
(List<BatchException70>)part0.getObject();
+
+           // if (logger.isDebugEnabled()) {
+              logger.info("We got an exception from the GatewayReceiver. 
MessageType : {} obj :{}", msg.getMessageType(), obj);
+            //}
+            // don't throw Exception but set it in the Ack
+            BatchException70 be = new BatchException70(l);
+            ack = new GatewayAck(be, l.get(0).getBatchId());
+
+          } else if (obj instanceof Throwable) {
+            String s = ": While reading Ack from receiver "
+                + ((Throwable)obj).getMessage();
+            throw new ServerOperationException(s, (Throwable)obj);
+          }
+          break;
+        default:
+          throw new InternalGemFireError(
+              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+        }
+      } finally {
+        msg.clear();
+      }
+      return ack;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGatewayBatch();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGatewayBatchSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGatewayBatch(start, hasTimedOut(), hasFailed());
+    }
+    
+    @Override
+    public boolean isGatewaySenderOp() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
new file mode 100644
index 0000000..8647d12
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Used to send operations from a sender to a receiver.
+ * @since GemFire 8.1
+ */
+public class SenderProxy extends ServerProxy{
+  public SenderProxy(InternalPool pool) {
+    super(pool);
+  }
+
+  public void dispatchBatch_NewWAN(Connection con, List events, int batchId, 
boolean isRetry)
+  {
+    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry);
+  }
+  
+  public Object receiveAckFromReceiver(Connection con)
+  {
+    return GatewaySenderBatchOp.executeOn(con, this.pool);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
new file mode 100644
index 0000000..60973ed
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.net.*;
+import com.gemstone.gemfire.internal.tcp.ConnectionException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class represent a runnable task which exchange the locator information
+ * with local locators(within the site) as well as remote locators (across the
+ * site)
+ * 
+ * @since GemFire 7.0
+ */
+public class LocatorDiscovery{
+
+  private static final Logger logger = LogService.getLogger();
+
+  private WanLocatorDiscoverer discoverer;
+
+  private DistributionLocatorId locatorId;
+  
+  private LocatorMembershipListener locatorListener;
+  
+  RemoteLocatorJoinRequest request;
+  
+  TcpClient locatorClient;
+
+  public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT = Integer
+      .getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue();
+
+  public static final int WAN_LOCATOR_CONNECTION_INTERVAL = Integer.getInteger(
+      "WANLocator.CONNECTION_INTERVAL", 10000).intValue();
+
+  public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger(
+      "WANLocator.PING_INTERVAL", 10000).intValue();
+
+  public LocatorDiscovery(WanLocatorDiscoverer discoverer, 
DistributionLocatorId locator,RemoteLocatorJoinRequest request,
+      LocatorMembershipListener locatorListener) {
+    this.discoverer = discoverer;
+    this.locatorId = locator;
+    this.request = request; 
+    this.locatorListener = locatorListener;
+    this.locatorClient = new TcpClient();
+  }
+  
+  /**
+   * When a batch fails, then this keeps the last time when a failure was 
logged
+   * . We don't want to swamp the logs in retries due to same batch failures.
+   */
+  private final ConcurrentHashMap<DistributionLocatorId, long[]> 
failureLogInterval = new ConcurrentHashMap<DistributionLocatorId, long[]>();
+
+  /**
+   * The maximum size of {@link #failureLogInterval} beyond which it will start
+   * logging all failure instances. Hopefully this should never happen in
+   * practice.
+   */
+  private static final int FAILURE_MAP_MAXSIZE = Integer.getInteger(
+      DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.FAILURE_MAP_MAXSIZE", 
1000000);
+
+  /**
+   * The maximum interval for logging failures of the same event in millis.
+   */
+  private static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger(
+      DistributionConfig.GEMFIRE_PREFIX + 
"LocatorDiscovery.FAILURE_LOG_MAX_INTERVAL", 300000);
+
+  public final boolean skipFailureLogging(DistributionLocatorId locatorId) {
+    boolean skipLogging = false;
+    if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
+      long[] logInterval = this.failureLogInterval.get(locatorId);
+      if (logInterval == null) {
+        logInterval = this.failureLogInterval.putIfAbsent(locatorId,
+            new long[] { System.currentTimeMillis(), 1000 });
+      }
+      if (logInterval != null) {
+        long currentTime = System.currentTimeMillis();
+        if ((currentTime - logInterval[0]) < logInterval[1]) {
+          skipLogging = true;
+        } else {
+          logInterval[0] = currentTime;
+          if (logInterval[1] <= (FAILURE_LOG_MAX_INTERVAL / 2)) {
+            logInterval[1] *= 2;
+          }
+        }
+      }
+    }
+    return skipLogging;
+  }
+
+
+  public class LocalLocatorDiscovery implements Runnable {
+    public void run() {
+      exchangeLocalLocators();
+    }
+  }
+
+  public class RemoteLocatorDiscovery implements Runnable {
+    public void run() {
+      exchangeRemoteLocators();
+    }
+  }
+
+  private WanLocatorDiscoverer getDiscoverer() {
+    return this.discoverer;
+  }
+  
+  private void exchangeLocalLocators() {
+    int retryAttempt = 1;
+    while (!getDiscoverer().isStopped()) {
+      try {
+        RemoteLocatorJoinResponse response = 
(RemoteLocatorJoinResponse)locatorClient
+            .requestToServer(locatorId.getHost(), locatorId.getPort(), request,
+                WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+        if (response != null) {
+          LocatorHelper.addExchangedLocators(response.getLocators(),
+              this.locatorListener);
+          
logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1,
+              new Object[] { request.getLocator(), locatorId, 
response.getLocators() }));
+          break;
+        }
+      }
+      catch (IOException ioe) {
+        if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
+          ConnectionException coe = new ConnectionException(
+              "Not able to connect to local locator after "
+              + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts",
+          ioe);
+          
logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2,
+              new Object[] { request.getLocator(),locatorId, retryAttempt }), 
coe);
+          break;
+        }
+        if (skipFailureLogging(locatorId)) {
+          
logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS,
+              new Object[] { request.getLocator(), locatorId, retryAttempt, 
WAN_LOCATOR_CONNECTION_INTERVAL }));
+        }
+        try {
+          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        retryAttempt++;
+        continue;
+      }
+      catch (ClassNotFoundException classNotFoundException) {
+        
logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION),
 classNotFoundException);
+        break;
+      }
+    }
+  }
+  
+  public void exchangeRemoteLocators() {
+    int retryAttempt = 1;
+    DistributionLocatorId remoteLocator = this.locatorId;
+    while (!getDiscoverer().isStopped()) {
+      RemoteLocatorJoinResponse response;
+      try {
+        response = (RemoteLocatorJoinResponse)locatorClient
+            .requestToServer(remoteLocator.getHost(), remoteLocator.getPort(),
+                request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+        if (response != null) {
+          LocatorHelper.addExchangedLocators(response.getLocators(), 
this.locatorListener);
+          
logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1,
+              new Object[] { request.getLocator(), locatorId, 
response.getLocators() }));
+          RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest(
+              "");
+          while (true) {
+            Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
+            RemoteLocatorPingResponse pingResponse = 
(RemoteLocatorPingResponse)locatorClient
+                .requestToServer(remoteLocator.getHost(),
+                    remoteLocator.getPort(), pingRequest,
+                    WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT);
+            if (pingResponse != null) {
+              continue;
+            }
+            break;
+          }
+        }
+      }
+      catch (IOException ioe) {
+        if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
+          
logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2,
+              new Object[] { request.getLocator(), remoteLocator, 
retryAttempt}), ioe);
+          break;
+        }
+        if (skipFailureLogging(remoteLocator)) {
+          
logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS,
+              new Object[] { request.getLocator(), remoteLocator, 
retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL }));
+        }
+        try {
+          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        retryAttempt++;
+        continue;
+      }
+      catch (ClassNotFoundException classNotFoundException) {
+        
logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION),
 classNotFoundException);
+        break;
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java
new file mode 100644
index 0000000..83b6db3
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import 
com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+/**
+ * This is a helper class which helps to add the locator information to the 
allLocatorInfoMap.
+ * 
+ *
+ */
+public class LocatorHelper {
+  
+  public final static Object locatorObject = new Object();
+  /**
+   * 
+   * This methods add the given locator to allLocatorInfoMap.
+   * It also invokes a locatorlistener to inform other locators in 
allLocatorInfoMap about this newly added locator.
+   * @param distributedSystemId
+   * @param locator
+   * @param locatorListener
+   * @param sourceLocator
+   */
+  public static boolean addLocator(int distributedSystemId,
+      DistributionLocatorId locator, LocatorMembershipListener locatorListener,
+      DistributionLocatorId sourceLocator) {
+      ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = 
(ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener
+          .getAllLocatorsInfo();
+      Set<DistributionLocatorId> locatorsSet = new 
CopyOnWriteHashSet<DistributionLocatorId>();
+      locatorsSet.add(locator);
+      Set<DistributionLocatorId> existingValue = 
allLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet);
+      if(existingValue != null){
+        if (!existingValue.contains(locator)) {
+          existingValue.add(locator);
+          addServerLocator(distributedSystemId, locatorListener, locator);
+          locatorListener.locatorJoined(distributedSystemId, locator,
+              sourceLocator);
+        }
+        else {
+          return false;
+        }
+      }else{
+        addServerLocator(distributedSystemId, locatorListener, locator);
+        locatorListener.locatorJoined(distributedSystemId, locator,
+          sourceLocator);
+      }
+    return true;
+  }
+
+  /**
+   * This methods decides whether the given locator is server locator, if so
+   * then add this locator in allServerLocatorsInfo map.
+   * 
+   * @param distributedSystemId
+   * @param locatorListener
+   * @param locator
+   */
+  private static void addServerLocator(Integer distributedSystemId,
+      LocatorMembershipListener locatorListener, DistributionLocatorId 
locator) {
+    if (!locator.isServerLocator()) {
+      return;
+    }
+    ConcurrentHashMap<Integer, Set<String>> allServerLocatorsInfo = 
(ConcurrentHashMap<Integer, Set<String>>)locatorListener
+        .getAllServerLocatorsInfo();
+    
+    Set<String> locatorsSet = new CopyOnWriteHashSet<String>();
+    locatorsSet.add(locator.toString());
+    Set<String> existingValue = 
allServerLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet);
+    if(existingValue != null){
+      if (!existingValue.contains(locator.toString())) {
+        existingValue.add(locator.toString());
+      }
+    }
+  }
+
+  /**
+   * This method adds the map of locatorsinfo sent by other locator to this 
locator's allLocatorInfo
+   * 
+   * @param locators
+   * @param locatorListener
+   */
+  public static boolean addExchangedLocators(Map<Integer, 
Set<DistributionLocatorId>> locators,
+                                             LocatorMembershipListener 
locatorListener) {
+
+    ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocators = 
(ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener
+        .getAllLocatorsInfo();
+    if (!allLocators.equals(locators)) {
+      for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators
+          .entrySet()) {
+        Set<DistributionLocatorId> existingValue = allLocators.putIfAbsent(
+            entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(entry
+                .getValue()));
+
+        if (existingValue != null) {
+          Set<DistributionLocatorId> localLocators = allLocators.get(entry
+              .getKey());
+          if (!localLocators.equals(entry.getValue())) {
+            entry.getValue().removeAll(localLocators);
+            for (DistributionLocatorId locator : entry.getValue()) {
+              localLocators.add(locator);
+              addServerLocator(entry.getKey(), locatorListener, locator);
+              locatorListener.locatorJoined(entry.getKey(), locator, null);
+            }
+          }
+
+        }
+        else {
+          for (DistributionLocatorId locator : entry.getValue()) {
+            addServerLocator(entry.getKey(), locatorListener, locator);
+            locatorListener.locatorJoined(entry.getKey(), locator, null);
+          }
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java
new file mode 100644
index 0000000..86ae0d6
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import 
com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorJoinMessage extends ServerLocationRequest {
+
+  private DistributionLocatorId locator;
+  
+  private int distributedSystemId;
+  
+  private DistributionLocatorId sourceLocator;
+
+  public LocatorJoinMessage() {
+    super();
+  }
+
+  public LocatorJoinMessage(int distributedSystemId, DistributionLocatorId 
locator,
+      DistributionLocatorId sourceLocator, String serverGroup) {
+    super(serverGroup);
+    this.locator = locator;
+    this.distributedSystemId = distributedSystemId;
+    this.sourceLocator = sourceLocator;
+  }
+
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    super.fromData(in);
+    this.locator = DataSerializer.readObject(in);
+    this.distributedSystemId = in.readInt();
+    this.sourceLocator = DataSerializer.readObject(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeObject(locator, out);
+    out.writeInt(this.distributedSystemId);
+    DataSerializer.writeObject(sourceLocator, out);
+  }
+
+  public DistributionLocatorId getLocator() {
+    return this.locator;
+  }
+
+  public int getDistributedSystemId() {
+    return distributedSystemId;
+  }
+  
+  public DistributionLocatorId getSourceLocator() {
+    return sourceLocator;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.LOCATOR_JOIN_MESSAGE;
+  }
+
+  @Override
+  public String toString() {
+    return "LocatorJoinMessage{distributedSystemId="+ distributedSystemId +" 
locators=" + locator + " Source Locator : " + sourceLocator +"}";
+  }
+
+  @Override
+  public boolean equals(Object obj){
+    if ( this == obj ) return true;
+    if ( !(obj instanceof LocatorJoinMessage) ) return false;
+    LocatorJoinMessage myObject = (LocatorJoinMessage)obj;
+    if((this.distributedSystemId == myObject.getDistributedSystemId()) && 
this.locator.equals(myObject.getLocator())){
+      return true;
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    // it is sufficient for all messages having the same locator to hash to 
the same bucket
+    if (this.locator == null) {
+      return 0;
+    } else {
+      return this.locator.hashCode();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
new file mode 100644
index 0000000..01e01e7
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An implementation of
+ * {@link 
com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener}
+ * 
+ * 
+ */
+public class LocatorMembershipListenerImpl implements 
LocatorMembershipListener {
+
+  private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = 
new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>();
+  
+  private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo = new 
ConcurrentHashMap<Integer, Set<String>>();
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  private DistributionConfig config;
+  
+  private TcpClient tcpClient;
+  
+  private int port;
+  
+  public LocatorMembershipListenerImpl() {
+    this.tcpClient = new TcpClient();
+  }
+  
+  public void setPort(int port){
+    this.port = port;
+  }
+
+  public void setConfig(DistributionConfig config) {
+    this.config = config;
+  }
+  
+  /**
+   * When the new locator is added to remote locator metadata, inform all other
+   * locators in remote locator metadata about the new locator so that they can
+   * update their remote locator metadata.
+   * 
+   * @param locator
+   */
+  
+  public void locatorJoined(final int distributedSystemId,
+      final DistributionLocatorId locator,
+      final DistributionLocatorId sourceLocator) {
+    Thread distributeLocator = new Thread(new Runnable() {
+      public void run() {
+        ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = 
getAllLocatorsInfo();
+        ArrayList<DistributionLocatorId> locatorsToRemove = new 
ArrayList<DistributionLocatorId>();
+        
+        String localLocator = config.getStartLocator();
+        DistributionLocatorId localLocatorId = null;
+        if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+          localLocatorId = new DistributionLocatorId(port, config
+              .getBindAddress());
+        }
+        else {
+          localLocatorId = new DistributionLocatorId(localLocator);
+        }
+        locatorsToRemove.add(localLocatorId);
+        locatorsToRemove.add(locator);
+        locatorsToRemove.add(sourceLocator);
+        
+        Map<Integer, Set<DistributionLocatorId>> localCopy = new 
HashMap<Integer, Set<DistributionLocatorId>>();
+        for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
remoteLocators.entrySet()){
+          Set<DistributionLocatorId> value = new 
CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue());
+          localCopy.put(entry.getKey(), value);
+        }  
+        for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : 
localCopy.entrySet()){
+          for(DistributionLocatorId removeLocId : locatorsToRemove){
+            if(entry.getValue().contains(removeLocId)){
+              entry.getValue().remove(removeLocId);
+            }
+          }
+          for (DistributionLocatorId value : entry.getValue()) {
+            try {
+              tcpClient.requestToServer(value.getHost(), value.getPort(),
+                  new LocatorJoinMessage(distributedSystemId, locator, 
localLocatorId, ""), 1000, false);
+            }
+            catch (Exception e) {
+              if (logger.isDebugEnabled()) {
+                
logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3,
 
+                    new Object[] { locator.getHost(), locator.getPort(), 
value.getHost(), value.getPort() }));
+              }
+            }
+            try {
+              tcpClient.requestToServer(locator.getHost(), locator.getPort(),
+                  new LocatorJoinMessage(entry.getKey(), value, 
localLocatorId, ""), 1000, false);
+            }
+            catch (Exception e) {
+              if (logger.isDebugEnabled()) {
+                
logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3,
+                    new Object[] { value.getHost(), value.getPort(), 
locator.getHost(), locator.getPort() }));
+              }
+            }
+          }
+        }
+      }
+    });
+    distributeLocator.setDaemon(true);
+    distributeLocator.start();
+  }
+
+  public Object handleRequest(Object request) {
+    Object response = null;
+    if (request instanceof RemoteLocatorJoinRequest) {
+      response = updateAllLocatorInfo((RemoteLocatorJoinRequest)request);
+    }
+    else if (request instanceof LocatorJoinMessage) {
+      response = informAboutRemoteLocators((LocatorJoinMessage)request);
+    }
+    else if (request instanceof RemoteLocatorPingRequest) {
+      response = getPingResponse((RemoteLocatorPingRequest)request);
+    }
+    else if (request instanceof RemoteLocatorRequest) {
+      response = getRemoteLocators((RemoteLocatorRequest)request);
+    }
+    return response;
+  }
+  
+  /**
+   * A locator from the request is checked against the existing remote locator
+   * metadata. If it is not available then added to existing remote locator
+   * metadata and LocatorMembershipListener is invoked to inform about the
+   * this newly added locator to all other locators available in remote locator
+   * metadata. As a response, remote locator metadata is sent.
+   * 
+   * @param request
+   */
+  private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest 
request) {
+    int distributedSystemId = request.getDistributedSystemId();
+    DistributionLocatorId locator = request.getLocator();
+
+    LocatorHelper.addLocator(distributedSystemId, locator, this, null);
+    return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo());
+  }
+  
+  private Object getPingResponse(RemoteLocatorPingRequest request) {
+   return new RemoteLocatorPingResponse();
+  }
+  
+  private Object informAboutRemoteLocators(LocatorJoinMessage request){
+    // TODO: FInd out the importance of list locatorJoinMessages. During
+    // refactoring I could not understand its significance
+//    synchronized (locatorJoinObject) {
+//      if (locatorJoinMessages.contains(request)) {
+//        return null;
+//      }
+//      locatorJoinMessages.add(request);  
+//    }
+    int distributedSystemId = request.getDistributedSystemId();
+    DistributionLocatorId locator = request.getLocator();
+    DistributionLocatorId sourceLocatorId = request.getSourceLocator();
+
+    LocatorHelper.addLocator(distributedSystemId, locator, this, 
sourceLocatorId);
+    return null;
+  }
+  
+  private Object getRemoteLocators(RemoteLocatorRequest request) {
+    int dsId = request.getDsId();
+    Set<String> locators = this.getRemoteLocatorInfo(dsId);
+    return new RemoteLocatorResponse(locators);
+  }
+  
+  public Set<String> getRemoteLocatorInfo(int dsId) {
+    return this.allServerLocatorsInfo.get(dsId);
+  }
+
+  public ConcurrentMap<Integer,Set<DistributionLocatorId>> 
getAllLocatorsInfo() {
+    return this.allLocatorsInfo;
+  }
+  
+  public ConcurrentMap<Integer,Set<String>> getAllServerLocatorsInfo() {
+    return this.allServerLocatorsInfo;
+  }
+  
+  public void clearLocatorInfo(){
+    allLocatorsInfo.clear();
+    allServerLocatorsInfo.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
new file mode 100644
index 0000000..c058333
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import 
com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+/**
+ * Requests remote locators of a remote WAN site
+ * 
+ * 
+ * @since GemFire 6.6
+ * 
+ */
+public class RemoteLocatorJoinRequest implements DataSerializableFixedID {
+
+  private DistributionLocatorId locator = null;
+ 
+  private int distributedSystemId = -1;
+
+  public RemoteLocatorJoinRequest() {
+    super();
+  }
+
+  public RemoteLocatorJoinRequest(int distributedSystemId, 
DistributionLocatorId locator,
+      String serverGroup) {
+    this.distributedSystemId = distributedSystemId;
+    this.locator = locator;
+  }
+
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    this.locator = DataSerializer.readObject(in);
+    this.distributedSystemId = in.readInt();
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeObject(locator, out);
+    out.writeInt(this.distributedSystemId);
+  }
+
+  public DistributionLocatorId getLocator() {
+    return this.locator;
+  }
+  
+  public int getDistributedSystemId() {
+    return distributedSystemId;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorJoinRequest{locator=" + locator + "}";
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
new file mode 100644
index 0000000..42c3bb0
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+
+/**
+ * List of remote locators as a response
+ * 
+ * 
+ * 
+ */
+public class RemoteLocatorJoinResponse implements DataSerializableFixedID{
+
+  private HashMap<Integer, Set<DistributionLocatorId>> locators = new 
HashMap<Integer, Set<DistributionLocatorId>>();
+  
+  /** Used by DataSerializer */
+  public RemoteLocatorJoinResponse() {
+    super();
+  }
+
+  public RemoteLocatorJoinResponse(
+      Map<Integer, Set<DistributionLocatorId>> locators) {
+    super();
+    this.locators = new HashMap<Integer, Set<DistributionLocatorId>>();
+    for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators
+        .entrySet()) {
+      this.locators.put(entry.getKey(), new 
CopyOnWriteHashSet<DistributionLocatorId>(
+          entry.getValue()));
+    }
+  }
+  
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    this.locators = DataSerializer.readHashMap(in);
+    
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashMap(locators, out);
+  }
+
+  public Map<Integer, Set<DistributionLocatorId>> getLocators() {
+    return this.locators;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorJoinResponse{locators=" + locators + "}";
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
new file mode 100644
index 0000000..a1cb951
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ *
+ */
+
+public class RemoteLocatorPingRequest implements DataSerializableFixedID{
+
+  public RemoteLocatorPingRequest() {
+    super();
+  }
+
+  public RemoteLocatorPingRequest(String serverGroup) {
+  }
+
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+  }
+
+  public void toData(DataOutput out) throws IOException {
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
new file mode 100644
index 0000000..54fdc04
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ */
+public class RemoteLocatorPingResponse implements DataSerializableFixedID {
+
+
+  /** Used by DataSerializer */
+  public RemoteLocatorPingResponse() {
+    super();
+  }
+
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+  }
+
+  public void toData(DataOutput out) throws IOException {
+  }
+
+
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java
new file mode 100644
index 0000000..d23ca93
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+/**
+ * 
+ *
+ */
+public class RemoteLocatorRequest implements DataSerializableFixedID{
+  private int distributedSystemId ;
+
+  public RemoteLocatorRequest() {
+    super();
+  }
+  public RemoteLocatorRequest(int dsId, String serverGroup) {
+    this.distributedSystemId = dsId;
+  }
+  
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    this.distributedSystemId = in.readInt();
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(this.distributedSystemId);
+  }
+
+  public int getDsId() {
+    return this.distributedSystemId;
+  }
+  
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_REQUEST;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteLocatorRequest{dsName=" + distributedSystemId + "}";
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java
new file mode 100644
index 0000000..a7ff6fd
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * 
+ *
+ */
+public class RemoteLocatorResponse implements DataSerializableFixedID{
+
+  private Set<String> locators ;
+
+  /** Used by DataSerializer */
+  public RemoteLocatorResponse() {
+    super();
+  }
+  
+  public RemoteLocatorResponse(Set<String> locators) {
+    this.locators = locators;
+  }
+  
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    this.locators = DataSerializer.readObject(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeObject(this.locators, out);
+  }
+
+  public Set<String> getLocators() {
+    return this.locators;
+  }
+  
+  @Override
+  public String toString() {
+    return "RemoteLocatorResponse{locators=" + locators +"}";
+  }
+
+  public int getDSFID() {
+    return DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+     return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java
new file mode 100644
index 0000000..2844594
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderFactoryImpl;
+import com.gemstone.gemfire.internal.cache.wan.spi.WANFactory;
+
+public class WANFactoryImpl implements WANFactory {
+  
+  @Override
+  public void initialize() {
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST,
+        RemoteLocatorJoinRequest.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE,
+        RemoteLocatorJoinResponse.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_REQUEST,
+        RemoteLocatorRequest.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.LOCATOR_JOIN_MESSAGE,
+        LocatorJoinMessage.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST,
+        RemoteLocatorPingRequest.class);
+    DSFIDFactory.registerDSFID(
+        DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE,
+        RemoteLocatorPingResponse.class);
+    DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE,
+        RemoteLocatorResponse.class);
+  }
+
+  @Override
+  public GatewaySenderFactory createGatewaySenderFactory(Cache cache) {
+    return new GatewaySenderFactoryImpl(cache);
+  }
+
+  @Override
+  public GatewayReceiverFactory createGatewayReceiverFactory(Cache cache) {
+    return new GatewayReceiverFactoryImpl(cache);
+  }
+
+  @Override
+  public WanLocatorDiscoverer createLocatorDiscoverer() {
+    return new WanLocatorDiscovererImpl();
+  }
+
+  @Override
+  public LocatorMembershipListener createLocatorMembershipListener() {
+    return new LocatorMembershipListenerImpl();
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
new file mode 100644
index 0000000..1d44e65
--- /dev/null
+++ 
b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.client.internal.locator.wan;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
+
+  private static final Logger logger = LogService.getLogger();
+
+  private volatile boolean stopped = false;
+  
+  private ExecutorService _executor;
+  
+  public WanLocatorDiscovererImpl() {
+    
+  }
+  
+  @Override
+  public void discover(int port,
+                       DistributionConfigImpl config,
+                       LocatorMembershipListener locatorListener,
+                       final String hostnameForClients) {
+    final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
+        .createThreadGroup("WAN Locator Discovery Logger Group", logger);
+
+    final ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable task) {
+        final Thread thread = new Thread(loggingThreadGroup, task,
+            "WAN Locator Discovery Thread");
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    this._executor = Executors.newCachedThreadPool(threadFactory);
+    exchangeLocalLocators(port, config, locatorListener, hostnameForClients);
+    exchangeRemoteLocators(port, config, locatorListener, hostnameForClients);
+    this._executor.shutdown();
+  }
+
+  @Override
+  public void stop() {
+    this.stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * For WAN 70 Exchange the locator information within the distributed system
+   *
+   * @param config
+   * @param hostnameForClients
+   */
+  private void exchangeLocalLocators(int port,
+                                     DistributionConfigImpl config,
+                                     LocatorMembershipListener locatorListener,
+                                     final String hostnameForClients) {
+    String localLocator = config.getStartLocator();
+    DistributionLocatorId locatorId = null;
+    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+      locatorId = new DistributionLocatorId(port, config.getBindAddress(), 
hostnameForClients);
+    }
+    else {
+      locatorId = new DistributionLocatorId(localLocator);
+    }
+    LocatorHelper.addLocator(config.getDistributedSystemId(), locatorId, 
locatorListener, null);
+
+    RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, 
hostnameForClients);
+    StringTokenizer locatorsOnThisVM = new StringTokenizer(
+        config.getLocators(), ",");
+    while (locatorsOnThisVM.hasMoreTokens()) {
+      DistributionLocatorId localLocatorId = new DistributionLocatorId(
+          locatorsOnThisVM.nextToken());
+      if (!locatorId.equals(localLocatorId)) {
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(this, 
localLocatorId, request, locatorListener);
+        LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = 
localDiscovery.new LocalLocatorDiscovery();
+        this._executor.execute(localLocatorDiscovery);
+      }
+    }
+  }
+  
+  /**
+   * For WAN 70 Exchange the locator information across the distributed systems
+   * (sites)
+   *
+   * @param config
+   * @param hostnameForClients
+   */
+  private void exchangeRemoteLocators(int port,
+                                      DistributionConfigImpl config,
+                                      LocatorMembershipListener 
locatorListener,
+                                      final String hostnameForClients) {
+    RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, 
hostnameForClients);
+    String remoteDistributedSystems = config.getRemoteLocators();
+    if (remoteDistributedSystems.length() > 0) {
+      StringTokenizer remoteLocators = new StringTokenizer(
+          remoteDistributedSystems, ",");
+      while (remoteLocators.hasMoreTokens()) {
+        DistributionLocatorId remoteLocatorId = new DistributionLocatorId(
+            remoteLocators.nextToken());
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(this, 
remoteLocatorId,
+            request, locatorListener);
+        LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = 
localDiscovery.new RemoteLocatorDiscovery();
+        this._executor.execute(remoteLocatorDiscovery);
+      }
+    }
+  }
+  
+  private RemoteLocatorJoinRequest buildRemoteDSJoinRequest(int port,
+                                                            
DistributionConfigImpl config,
+                                                            final String 
hostnameForClients) {
+    String localLocator = config.getStartLocator();
+    DistributionLocatorId locatorId = null;
+    if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) {
+      locatorId = new DistributionLocatorId(port, config.getBindAddress(), 
hostnameForClients);
+    }
+    else {
+      locatorId = new DistributionLocatorId(localLocator);
+    }
+    RemoteLocatorJoinRequest request = new RemoteLocatorJoinRequest(
+        config.getDistributedSystemId(), locatorId, "");
+    return request;
+  }
+  
+}

Reply via email to