http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java deleted file mode 100644 index 85e1bc0..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; -import java.util.Set; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.DistributedLockService; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ResourceEvent; -import com.gemstone.gemfire.internal.cache.EntryEventImpl; -import com.gemstone.gemfire.internal.cache.EventID; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; -import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * @since GemFire 7.0 - * - */ -public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { - - private static final Logger logger = LogService.getLogger(); - - final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup( - "Remote Site Discovery Logger Group", logger); - - public SerialGatewaySenderImpl(){ - super(); - this.isParallel = false; - } - public SerialGatewaySenderImpl(Cache cache, - GatewaySenderAttributes attrs) { - super(cache, attrs); - } - - @Override - public void start() { - if (logger.isDebugEnabled()) { - logger.debug("Starting gatewaySender : {}", this); - } - - this.getLifeCycleLock().writeLock().lock(); - try { - if (isRunning()) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId())); - return; - } - if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) { - String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem() - .getConfig().getLocators(); - if (locators.length() == 0) { - throw new GatewaySenderConfigurationException( - LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER - .toLocalizedString()); - } - } - getSenderAdvisor().initDLockService(); - if (!isPrimary()) { - if (getSenderAdvisor().volunteerForPrimary()) { - getSenderAdvisor().makePrimary(); - } else { - getSenderAdvisor().makeSecondary(); - } - } - if (getDispatcherThreads() > 1) { - eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor( - SerialGatewaySenderImpl.this); - } else { - eventProcessor = new RemoteSerialGatewaySenderEventProcessor( - SerialGatewaySenderImpl.this, getId()); - } - eventProcessor.start(); - waitForRunningStatus(); - this.startTime = System.currentTimeMillis(); - - //Only notify the type registry if this is a WAN gateway queue - if(!isAsyncEventQueue()) { - ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this); - } - new UpdateAttributesProcessor(this).distribute(false); - - - InternalDistributedSystem system = (InternalDistributedSystem) this.cache - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this); - - logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0, this)); - - enqueueTempEvents(); - } finally { - this.getLifeCycleLock().writeLock().unlock(); - } - } - - @Override - public void stop() { - if (logger.isDebugEnabled()) { - logger.debug("Stopping Gateway Sender : {}", this); - } - this.getLifeCycleLock().writeLock().lock(); - try { - // Stop the dispatcher - AbstractGatewaySenderEventProcessor ev = this.eventProcessor; - if (ev != null && !ev.isStopped()) { - ev.stopProcessing(); - } - - // Stop the proxy (after the dispatcher, so the socket is still - // alive until after the dispatcher has stopped) - stompProxyDead(); - - // Close the listeners - for (AsyncEventListener listener : this.listeners) { - listener.close(); - } - logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this)); - - clearTempEventsAfterSenderStopped(); - } finally { - this.getLifeCycleLock().writeLock().unlock(); - } - - if (this.isPrimary()) { - try { - DistributedLockService - .destroy(getSenderAdvisor().getDLockServiceName()); - } catch (IllegalArgumentException e) { - // service not found... ignore - } - } - Set<RegionQueue> queues = getQueues(); - if (queues != null && !queues.isEmpty()) { - for (RegionQueue q : queues) { - ((SerialGatewaySenderQueue)q).cleanUp(); - } - } - - this.setIsPrimary(false); - new UpdateAttributesProcessor(this).distribute(false); - Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread(); - if (lockObtainingThread != null && lockObtainingThread.isAlive()) { - // wait a while for thread to terminate - try { - lockObtainingThread.join(3000); - } catch (InterruptedException ex) { - // we allowed our join to be canceled - // reset interrupt bit so this thread knows it has been interrupted - Thread.currentThread().interrupt(); - } - if (lockObtainingThread.isAlive()) { - logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP)); - } - } - - InternalDistributedSystem system = (InternalDistributedSystem) this.cache - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this); - - this.eventProcessor = null; - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("SerialGatewaySender{"); - sb.append("id=" + getId()); - sb.append(",remoteDsId="+ getRemoteDSId()); - sb.append(",isRunning ="+ isRunning()); - sb.append(",isPrimary ="+ isPrimary()); - sb.append("}"); - return sb.toString(); - } - - @Override - public void fillInProfile(Profile profile) { - assert profile instanceof GatewaySenderProfile; - GatewaySenderProfile pf = (GatewaySenderProfile)profile; - pf.Id = getId(); - pf.startTime = getStartTime(); - pf.remoteDSId = getRemoteDSId(); - pf.isRunning = isRunning(); - pf.isPrimary = isPrimary(); - pf.isParallel = false; - pf.isBatchConflationEnabled = isBatchConflationEnabled(); - pf.isPersistenceEnabled = isPersistenceEnabled(); - pf.alertThreshold = getAlertThreshold(); - pf.manualStart = isManualStart(); - for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) { - pf.eventFiltersClassNames.add(filter.getClass().getName()); - } - for (GatewayTransportFilter filter : getGatewayTransportFilters()) { - pf.transFiltersClassNames.add(filter.getClass().getName()); - } - for (AsyncEventListener listener : getAsyncEventListeners()) { - pf.senderEventListenerClassNames.add(listener.getClass().getName()); - } - pf.isDiskSynchronous = isDiskSynchronous(); - pf.dispatcherThreads = getDispatcherThreads(); - pf.orderPolicy = getOrderPolicy(); - pf.serverLocation = this.getServerLocation(); - } - - /* (non-Javadoc) - * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl) - */ - @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { - EventID originalEventId = clonedEvent.getEventId(); - long originalThreadId = originalEventId.getThreadID(); - long newThreadId = originalThreadId; - if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) { - // This thread id has already been converted. Do nothing. - } else { - newThreadId = ThreadIdentifier - .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId, - getEventIdIndex()); - } - EventID newEventId = new EventID(originalEventId.getMembershipID(), - newThreadId, originalEventId.getSequenceID()); - if (logger.isDebugEnabled()) { - logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}", - this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId); - } - clonedEvent.setEventId(newEventId); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java new file mode 100755 index 0000000..b042da0 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/GatewaySenderBatchOp.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal; + +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.EventID; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.Part; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; + +import java.net.SocketTimeoutException; +import java.util.Iterator; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +@SuppressWarnings("unchecked") +public class GatewaySenderBatchOp { + + private static final Logger logger = LogService.getLogger(); + + /** + * Send a list of gateway events to a server to execute + * using connections from the given pool + * to communicate with the server. + * @param con the connection to send the message on. + * @param pool the pool to use to communicate with the server. + * @param events list of gateway events + * @param batchId the ID of this batch + */ + public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean isRetry) + { + AbstractOp op = null; + //System.out.println("Version: "+con.getWanSiteVersion()); + //Is this check even needed anymore? It looks like we just create the same exact op impl with the same parameters... + if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) { + op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); + } else { + // Default should create a batch of server version (ACCEPTOR.VERSION) + op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); + } + pool.executeOn(con, op, true/*timeoutFatal*/); + } + + + public static Object executeOn(Connection con, ExecutablePool pool) + { + AbstractOp op = new GatewaySenderGFEBatchOpImpl(); + return pool.executeOn(con, op, true/*timeoutFatal*/); + } + + private GatewaySenderBatchOp() { + // no instances allowed + } + + static class GatewaySenderGFEBatchOpImpl extends AbstractOp { + + /** + * @throws com.gemstone.gemfire.SerializationException if serialization fails + */ + public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, boolean isRetry) { + super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events)); + boolean removeFromQueueOnException = true; + if (isRetry) { + getMessage().setIsRetry(); + } + getMessage().addIntPart(events.size()); + getMessage().addIntPart(batchId); + getMessage().addIntPart(dsId); + getMessage().addBytesPart( + new byte[] { removeFromQueueOnException ? (byte)1 : (byte)0 }); + // Add each event + for (Iterator i = events.iterator(); i.hasNext();) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); + // Add action + int action = event.getAction(); + getMessage().addIntPart(action); + { // Add posDup flag + byte posDupByte = (byte)(event.getPossibleDuplicate()?0x01:0x00); + getMessage().addBytesPart(new byte[] {posDupByte}); + } + if (action >= 0 && action <= 3) { + // 0 = create + // 1 = update + // 2 = destroy + String regionName = event.getRegionPath(); + EventID eventId = event.getEventId(); + Object key = event.getKey(); + Object callbackArg = event.getSenderCallbackArgument(); + + // Add region name + getMessage().addStringPart(regionName); + // Add event id + getMessage().addObjPart(eventId); + // Add key + getMessage().addStringOrObjPart(key); + if (action < 2 /* it is 0 or 1 */) { + byte[] value = event.getSerializedValue(); + byte valueIsObject = event.getValueIsObject();; + // Add value (which is already a serialized byte[]) + getMessage().addRawPart(value, (valueIsObject == 0x01)); + } + // Add callback arg if necessary + if (callbackArg == null) { + getMessage().addBytesPart(new byte[] {0x00}); + } else { + getMessage().addBytesPart(new byte[] {0x01}); + getMessage().addObjPart(callbackArg); + } + getMessage().addLongPart(event.getVersionTimeStamp()); + } + } + } + + public GatewaySenderGFEBatchOpImpl() { + super(MessageType.GATEWAY_RECEIVER_COMMAND, 0); + } + + @Override + public Object attempt(Connection cnx) throws Exception { + if (getMessage().getNumberOfParts() == 0) { + return attemptRead(cnx); + } + this.failed = true; + this.timedOut = false; + long start = startAttempt(cnx.getStats()); + try { + try { + attemptSend(cnx); + this.failed = false; + } finally { + endSendAttempt(cnx.getStats(), start); + } + } finally { + endAttempt(cnx.getStats(), start); + } + return this.failed; + } + + private Object attemptRead(Connection cnx) throws Exception { + this.failed = true; + try { + Object result = attemptReadResponse(cnx); + this.failed = false; + return result; + } catch (SocketTimeoutException ste) { + this.failed = false; + this.timedOut = true; + throw ste; + } catch (Exception e) { + throw e; + } + } + + + /** + * Attempts to read a response to this operation by reading it from the + * given connection, and returning it. + * @param cnx the connection to read the response from + * @return the result of the operation + * or <code>null</code> if the operation has no result. + * @throws Exception if the execute failed + */ + protected Object attemptReadResponse(Connection cnx) throws Exception { + Message msg = createResponseMessage(); + if (msg != null) { + msg.setComms(cnx.getSocket(), cnx.getInputStream(), + cnx.getOutputStream(), + ((ConnectionImpl)cnx).getCommBufferForAsyncRead(), cnx.getStats()); + if (msg instanceof ChunkedMessage) { + try { + return processResponse(msg, cnx); + } finally { + msg.unsetComms(); + // TODO (ashetkar) Handle the case when we fail to read the + // connection id. + processSecureBytes(cnx, msg); + } + } + + try { + msg.recv(); + } finally { + msg.unsetComms(); + processSecureBytes(cnx, msg); + } + return processResponse(msg, cnx); + } + + return null; + } + + + private static int calcPartCount(List events) { + int numberOfParts = 4; // for the number of events and the batchId + for (Iterator i = events.iterator(); i.hasNext();) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); + numberOfParts += event.getNumberOfParts(); + } + return numberOfParts; + } + + @Override + protected void processSecureBytes(Connection cnx, Message message) + throws Exception { + } + + @Override + protected boolean needsUserId() { + return false; + } + + @Override + protected void sendMessage(Connection cnx) throws Exception { + getMessage().clearMessageHasSecurePartFlag(); + getMessage().send(false); + } + + @Override + protected Object processResponse(Message msg) throws Exception { + GatewayAck ack = null; + try { + // Read the header which describes the type of message following + switch (msg.getMessageType()) { + case MessageType.REPLY: + // Read the chunk + Part part0 = msg.getPart(0); + if (part0.isBytes() && part0.getLength() == 1 && part0.getSerializedForm()[0] == 0) { + // REPLY_OKAY from a CloseConnection + break; + } + int batchId = part0.getInt(); + int numEvents = msg.getPart(1).getInt(); + ack = new GatewayAck(batchId, numEvents); + break; + case MessageType.EXCEPTION: + part0 = msg.getPart(0); + + Object obj = part0.getObject(); + if (obj instanceof List) { + List<BatchException70> l = (List<BatchException70>)part0.getObject(); + + // if (logger.isDebugEnabled()) { + logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj); + //} + // don't throw Exception but set it in the Ack + BatchException70 be = new BatchException70(l); + ack = new GatewayAck(be, l.get(0).getBatchId()); + + } else if (obj instanceof Throwable) { + String s = ": While reading Ack from receiver " + + ((Throwable)obj).getMessage(); + throw new ServerOperationException(s, (Throwable)obj); + } + break; + default: + throw new InternalGemFireError( + LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0 + .toLocalizedString(Integer.valueOf(msg.getMessageType()))); + } + } finally { + msg.clear(); + } + return ack; + } + + @Override + protected boolean isErrorResponse(int msgType) { + return false; + } + @Override + protected long startAttempt(ConnectionStats stats) { + return stats.startGatewayBatch(); + } + @Override + protected void endSendAttempt(ConnectionStats stats, long start) { + stats.endGatewayBatchSend(start, hasFailed()); + } + @Override + protected void endAttempt(ConnectionStats stats, long start) { + stats.endGatewayBatch(start, hasTimedOut(), hasFailed()); + } + + @Override + public boolean isGatewaySenderOp() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java new file mode 100644 index 0000000..8647d12 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/SenderProxy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal; + +import java.util.List; + +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.distributed.internal.ServerLocation; + +/** + * Used to send operations from a sender to a receiver. + * @since GemFire 8.1 + */ +public class SenderProxy extends ServerProxy{ + public SenderProxy(InternalPool pool) { + super(pool); + } + + public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean isRetry) + { + GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry); + } + + public Object receiveAckFromReceiver(Connection con) + { + return GatewaySenderBatchOp.executeOn(con, this.pool); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java new file mode 100644 index 0000000..60973ed --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.net.*; +import com.gemstone.gemfire.internal.tcp.ConnectionException; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class represent a runnable task which exchange the locator information + * with local locators(within the site) as well as remote locators (across the + * site) + * + * @since GemFire 7.0 + */ +public class LocatorDiscovery{ + + private static final Logger logger = LogService.getLogger(); + + private WanLocatorDiscoverer discoverer; + + private DistributionLocatorId locatorId; + + private LocatorMembershipListener locatorListener; + + RemoteLocatorJoinRequest request; + + TcpClient locatorClient; + + public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT = Integer + .getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue(); + + public static final int WAN_LOCATOR_CONNECTION_INTERVAL = Integer.getInteger( + "WANLocator.CONNECTION_INTERVAL", 10000).intValue(); + + public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger( + "WANLocator.PING_INTERVAL", 10000).intValue(); + + public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,RemoteLocatorJoinRequest request, + LocatorMembershipListener locatorListener) { + this.discoverer = discoverer; + this.locatorId = locator; + this.request = request; + this.locatorListener = locatorListener; + this.locatorClient = new TcpClient(); + } + + /** + * When a batch fails, then this keeps the last time when a failure was logged + * . We don't want to swamp the logs in retries due to same batch failures. + */ + private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval = new ConcurrentHashMap<DistributionLocatorId, long[]>(); + + /** + * The maximum size of {@link #failureLogInterval} beyond which it will start + * logging all failure instances. Hopefully this should never happen in + * practice. + */ + private static final int FAILURE_MAP_MAXSIZE = Integer.getInteger( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.FAILURE_MAP_MAXSIZE", 1000000); + + /** + * The maximum interval for logging failures of the same event in millis. + */ + private static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger( + DistributionConfig.GEMFIRE_PREFIX + "LocatorDiscovery.FAILURE_LOG_MAX_INTERVAL", 300000); + + public final boolean skipFailureLogging(DistributionLocatorId locatorId) { + boolean skipLogging = false; + if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) { + long[] logInterval = this.failureLogInterval.get(locatorId); + if (logInterval == null) { + logInterval = this.failureLogInterval.putIfAbsent(locatorId, + new long[] { System.currentTimeMillis(), 1000 }); + } + if (logInterval != null) { + long currentTime = System.currentTimeMillis(); + if ((currentTime - logInterval[0]) < logInterval[1]) { + skipLogging = true; + } else { + logInterval[0] = currentTime; + if (logInterval[1] <= (FAILURE_LOG_MAX_INTERVAL / 2)) { + logInterval[1] *= 2; + } + } + } + } + return skipLogging; + } + + + public class LocalLocatorDiscovery implements Runnable { + public void run() { + exchangeLocalLocators(); + } + } + + public class RemoteLocatorDiscovery implements Runnable { + public void run() { + exchangeRemoteLocators(); + } + } + + private WanLocatorDiscoverer getDiscoverer() { + return this.discoverer; + } + + private void exchangeLocalLocators() { + int retryAttempt = 1; + while (!getDiscoverer().isStopped()) { + try { + RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)locatorClient + .requestToServer(locatorId.getHost(), locatorId.getPort(), request, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (response != null) { + LocatorHelper.addExchangedLocators(response.getLocators(), + this.locatorListener); + logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, + new Object[] { request.getLocator(), locatorId, response.getLocators() })); + break; + } + } + catch (IOException ioe) { + if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { + ConnectionException coe = new ConnectionException( + "Not able to connect to local locator after " + + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", + ioe); + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, + new Object[] { request.getLocator(),locatorId, retryAttempt }), coe); + break; + } + if (skipFailureLogging(locatorId)) { + logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, + new Object[] { request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); + } + try { + Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + retryAttempt++; + continue; + } + catch (ClassNotFoundException classNotFoundException) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), classNotFoundException); + break; + } + } + } + + public void exchangeRemoteLocators() { + int retryAttempt = 1; + DistributionLocatorId remoteLocator = this.locatorId; + while (!getDiscoverer().isStopped()) { + RemoteLocatorJoinResponse response; + try { + response = (RemoteLocatorJoinResponse)locatorClient + .requestToServer(remoteLocator.getHost(), remoteLocator.getPort(), + request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (response != null) { + LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener); + logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, + new Object[] { request.getLocator(), locatorId, response.getLocators() })); + RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest( + ""); + while (true) { + Thread.sleep(WAN_LOCATOR_PING_INTERVAL); + RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse)locatorClient + .requestToServer(remoteLocator.getHost(), + remoteLocator.getPort(), pingRequest, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (pingResponse != null) { + continue; + } + break; + } + } + } + catch (IOException ioe) { + if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, + new Object[] { request.getLocator(), remoteLocator, retryAttempt}), ioe); + break; + } + if (skipFailureLogging(remoteLocator)) { + logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, + new Object[] { request.getLocator(), remoteLocator, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); + } + try { + Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + retryAttempt++; + continue; + } + catch (ClassNotFoundException classNotFoundException) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), classNotFoundException); + break; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java new file mode 100644 index 0000000..83b6db3 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorHelper.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +/** + * This is a helper class which helps to add the locator information to the allLocatorInfoMap. + * + * + */ +public class LocatorHelper { + + public final static Object locatorObject = new Object(); + /** + * + * This methods add the given locator to allLocatorInfoMap. + * It also invokes a locatorlistener to inform other locators in allLocatorInfoMap about this newly added locator. + * @param distributedSystemId + * @param locator + * @param locatorListener + * @param sourceLocator + */ + public static boolean addLocator(int distributedSystemId, + DistributionLocatorId locator, LocatorMembershipListener locatorListener, + DistributionLocatorId sourceLocator) { + ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener + .getAllLocatorsInfo(); + Set<DistributionLocatorId> locatorsSet = new CopyOnWriteHashSet<DistributionLocatorId>(); + locatorsSet.add(locator); + Set<DistributionLocatorId> existingValue = allLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); + if(existingValue != null){ + if (!existingValue.contains(locator)) { + existingValue.add(locator); + addServerLocator(distributedSystemId, locatorListener, locator); + locatorListener.locatorJoined(distributedSystemId, locator, + sourceLocator); + } + else { + return false; + } + }else{ + addServerLocator(distributedSystemId, locatorListener, locator); + locatorListener.locatorJoined(distributedSystemId, locator, + sourceLocator); + } + return true; + } + + /** + * This methods decides whether the given locator is server locator, if so + * then add this locator in allServerLocatorsInfo map. + * + * @param distributedSystemId + * @param locatorListener + * @param locator + */ + private static void addServerLocator(Integer distributedSystemId, + LocatorMembershipListener locatorListener, DistributionLocatorId locator) { + if (!locator.isServerLocator()) { + return; + } + ConcurrentHashMap<Integer, Set<String>> allServerLocatorsInfo = (ConcurrentHashMap<Integer, Set<String>>)locatorListener + .getAllServerLocatorsInfo(); + + Set<String> locatorsSet = new CopyOnWriteHashSet<String>(); + locatorsSet.add(locator.toString()); + Set<String> existingValue = allServerLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); + if(existingValue != null){ + if (!existingValue.contains(locator.toString())) { + existingValue.add(locator.toString()); + } + } + } + + /** + * This method adds the map of locatorsinfo sent by other locator to this locator's allLocatorInfo + * + * @param locators + * @param locatorListener + */ + public static boolean addExchangedLocators(Map<Integer, Set<DistributionLocatorId>> locators, + LocatorMembershipListener locatorListener) { + + ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocators = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener + .getAllLocatorsInfo(); + if (!allLocators.equals(locators)) { + for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators + .entrySet()) { + Set<DistributionLocatorId> existingValue = allLocators.putIfAbsent( + entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(entry + .getValue())); + + if (existingValue != null) { + Set<DistributionLocatorId> localLocators = allLocators.get(entry + .getKey()); + if (!localLocators.equals(entry.getValue())) { + entry.getValue().removeAll(localLocators); + for (DistributionLocatorId locator : entry.getValue()) { + localLocators.add(locator); + addServerLocator(entry.getKey(), locatorListener, locator); + locatorListener.locatorJoined(entry.getKey(), locator, null); + } + } + + } + else { + for (DistributionLocatorId locator : entry.getValue()) { + addServerLocator(entry.getKey(), locatorListener, locator); + locatorListener.locatorJoined(entry.getKey(), locator, null); + } + } + } + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java new file mode 100644 index 0000000..86ae0d6 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorJoinMessage.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +public class LocatorJoinMessage extends ServerLocationRequest { + + private DistributionLocatorId locator; + + private int distributedSystemId; + + private DistributionLocatorId sourceLocator; + + public LocatorJoinMessage() { + super(); + } + + public LocatorJoinMessage(int distributedSystemId, DistributionLocatorId locator, + DistributionLocatorId sourceLocator, String serverGroup) { + super(serverGroup); + this.locator = locator; + this.distributedSystemId = distributedSystemId; + this.sourceLocator = sourceLocator; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.locator = DataSerializer.readObject(in); + this.distributedSystemId = in.readInt(); + this.sourceLocator = DataSerializer.readObject(in); + } + + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeObject(locator, out); + out.writeInt(this.distributedSystemId); + DataSerializer.writeObject(sourceLocator, out); + } + + public DistributionLocatorId getLocator() { + return this.locator; + } + + public int getDistributedSystemId() { + return distributedSystemId; + } + + public DistributionLocatorId getSourceLocator() { + return sourceLocator; + } + + public int getDSFID() { + return DataSerializableFixedID.LOCATOR_JOIN_MESSAGE; + } + + @Override + public String toString() { + return "LocatorJoinMessage{distributedSystemId="+ distributedSystemId +" locators=" + locator + " Source Locator : " + sourceLocator +"}"; + } + + @Override + public boolean equals(Object obj){ + if ( this == obj ) return true; + if ( !(obj instanceof LocatorJoinMessage) ) return false; + LocatorJoinMessage myObject = (LocatorJoinMessage)obj; + if((this.distributedSystemId == myObject.getDistributedSystemId()) && this.locator.equals(myObject.getLocator())){ + return true; + } + return false; + } + + @Override + public int hashCode() { + // it is sufficient for all messages having the same locator to hash to the same bucket + if (this.locator == null) { + return 0; + } else { + return this.locator.hashCode(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java new file mode 100644 index 0000000..01e01e7 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An implementation of + * {@link com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener} + * + * + */ +public class LocatorMembershipListenerImpl implements LocatorMembershipListener { + + private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>(); + + private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo = new ConcurrentHashMap<Integer, Set<String>>(); + + private static final Logger logger = LogService.getLogger(); + + private DistributionConfig config; + + private TcpClient tcpClient; + + private int port; + + public LocatorMembershipListenerImpl() { + this.tcpClient = new TcpClient(); + } + + public void setPort(int port){ + this.port = port; + } + + public void setConfig(DistributionConfig config) { + this.config = config; + } + + /** + * When the new locator is added to remote locator metadata, inform all other + * locators in remote locator metadata about the new locator so that they can + * update their remote locator metadata. + * + * @param locator + */ + + public void locatorJoined(final int distributedSystemId, + final DistributionLocatorId locator, + final DistributionLocatorId sourceLocator) { + Thread distributeLocator = new Thread(new Runnable() { + public void run() { + ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo(); + ArrayList<DistributionLocatorId> locatorsToRemove = new ArrayList<DistributionLocatorId>(); + + String localLocator = config.getStartLocator(); + DistributionLocatorId localLocatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + localLocatorId = new DistributionLocatorId(port, config + .getBindAddress()); + } + else { + localLocatorId = new DistributionLocatorId(localLocator); + } + locatorsToRemove.add(localLocatorId); + locatorsToRemove.add(locator); + locatorsToRemove.add(sourceLocator); + + Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<Integer, Set<DistributionLocatorId>>(); + for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()){ + Set<DistributionLocatorId> value = new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue()); + localCopy.put(entry.getKey(), value); + } + for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()){ + for(DistributionLocatorId removeLocId : locatorsToRemove){ + if(entry.getValue().contains(removeLocId)){ + entry.getValue().remove(removeLocId); + } + } + for (DistributionLocatorId value : entry.getValue()) { + try { + tcpClient.requestToServer(value.getHost(), value.getPort(), + new LocatorJoinMessage(distributedSystemId, locator, localLocatorId, ""), 1000, false); + } + catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, + new Object[] { locator.getHost(), locator.getPort(), value.getHost(), value.getPort() })); + } + } + try { + tcpClient.requestToServer(locator.getHost(), locator.getPort(), + new LocatorJoinMessage(entry.getKey(), value, localLocatorId, ""), 1000, false); + } + catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, + new Object[] { value.getHost(), value.getPort(), locator.getHost(), locator.getPort() })); + } + } + } + } + } + }); + distributeLocator.setDaemon(true); + distributeLocator.start(); + } + + public Object handleRequest(Object request) { + Object response = null; + if (request instanceof RemoteLocatorJoinRequest) { + response = updateAllLocatorInfo((RemoteLocatorJoinRequest)request); + } + else if (request instanceof LocatorJoinMessage) { + response = informAboutRemoteLocators((LocatorJoinMessage)request); + } + else if (request instanceof RemoteLocatorPingRequest) { + response = getPingResponse((RemoteLocatorPingRequest)request); + } + else if (request instanceof RemoteLocatorRequest) { + response = getRemoteLocators((RemoteLocatorRequest)request); + } + return response; + } + + /** + * A locator from the request is checked against the existing remote locator + * metadata. If it is not available then added to existing remote locator + * metadata and LocatorMembershipListener is invoked to inform about the + * this newly added locator to all other locators available in remote locator + * metadata. As a response, remote locator metadata is sent. + * + * @param request + */ + private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) { + int distributedSystemId = request.getDistributedSystemId(); + DistributionLocatorId locator = request.getLocator(); + + LocatorHelper.addLocator(distributedSystemId, locator, this, null); + return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo()); + } + + private Object getPingResponse(RemoteLocatorPingRequest request) { + return new RemoteLocatorPingResponse(); + } + + private Object informAboutRemoteLocators(LocatorJoinMessage request){ + // TODO: FInd out the importance of list locatorJoinMessages. During + // refactoring I could not understand its significance +// synchronized (locatorJoinObject) { +// if (locatorJoinMessages.contains(request)) { +// return null; +// } +// locatorJoinMessages.add(request); +// } + int distributedSystemId = request.getDistributedSystemId(); + DistributionLocatorId locator = request.getLocator(); + DistributionLocatorId sourceLocatorId = request.getSourceLocator(); + + LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId); + return null; + } + + private Object getRemoteLocators(RemoteLocatorRequest request) { + int dsId = request.getDsId(); + Set<String> locators = this.getRemoteLocatorInfo(dsId); + return new RemoteLocatorResponse(locators); + } + + public Set<String> getRemoteLocatorInfo(int dsId) { + return this.allServerLocatorsInfo.get(dsId); + } + + public ConcurrentMap<Integer,Set<DistributionLocatorId>> getAllLocatorsInfo() { + return this.allLocatorsInfo; + } + + public ConcurrentMap<Integer,Set<String>> getAllServerLocatorsInfo() { + return this.allServerLocatorsInfo; + } + + public void clearLocatorInfo(){ + allLocatorsInfo.clear(); + allServerLocatorsInfo.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java new file mode 100644 index 0000000..c058333 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +/** + * Requests remote locators of a remote WAN site + * + * + * @since GemFire 6.6 + * + */ +public class RemoteLocatorJoinRequest implements DataSerializableFixedID { + + private DistributionLocatorId locator = null; + + private int distributedSystemId = -1; + + public RemoteLocatorJoinRequest() { + super(); + } + + public RemoteLocatorJoinRequest(int distributedSystemId, DistributionLocatorId locator, + String serverGroup) { + this.distributedSystemId = distributedSystemId; + this.locator = locator; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locator = DataSerializer.readObject(in); + this.distributedSystemId = in.readInt(); + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeObject(locator, out); + out.writeInt(this.distributedSystemId); + } + + public DistributionLocatorId getLocator() { + return this.locator; + } + + public int getDistributedSystemId() { + return distributedSystemId; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST; + } + + @Override + public String toString() { + return "RemoteLocatorJoinRequest{locator=" + locator + "}"; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java new file mode 100644 index 0000000..42c3bb0 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +/** + * List of remote locators as a response + * + * + * + */ +public class RemoteLocatorJoinResponse implements DataSerializableFixedID{ + + private HashMap<Integer, Set<DistributionLocatorId>> locators = new HashMap<Integer, Set<DistributionLocatorId>>(); + + /** Used by DataSerializer */ + public RemoteLocatorJoinResponse() { + super(); + } + + public RemoteLocatorJoinResponse( + Map<Integer, Set<DistributionLocatorId>> locators) { + super(); + this.locators = new HashMap<Integer, Set<DistributionLocatorId>>(); + for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators + .entrySet()) { + this.locators.put(entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>( + entry.getValue())); + } + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locators = DataSerializer.readHashMap(in); + + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeHashMap(locators, out); + } + + public Map<Integer, Set<DistributionLocatorId>> getLocators() { + return this.locators; + } + + @Override + public String toString() { + return "RemoteLocatorJoinResponse{locators=" + locators + "}"; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java new file mode 100644 index 0000000..a1cb951 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + * + */ + +public class RemoteLocatorPingRequest implements DataSerializableFixedID{ + + public RemoteLocatorPingRequest() { + super(); + } + + public RemoteLocatorPingRequest(String serverGroup) { + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + } + + public void toData(DataOutput out) throws IOException { + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java new file mode 100644 index 0000000..54fdc04 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + */ +public class RemoteLocatorPingResponse implements DataSerializableFixedID { + + + /** Used by DataSerializer */ + public RemoteLocatorPingResponse() { + super(); + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + } + + public void toData(DataOutput out) throws IOException { + } + + + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java new file mode 100644 index 0000000..d23ca93 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +/** + * + * + */ +public class RemoteLocatorRequest implements DataSerializableFixedID{ + private int distributedSystemId ; + + public RemoteLocatorRequest() { + super(); + } + public RemoteLocatorRequest(int dsId, String serverGroup) { + this.distributedSystemId = dsId; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.distributedSystemId = in.readInt(); + } + + public void toData(DataOutput out) throws IOException { + out.writeInt(this.distributedSystemId); + } + + public int getDsId() { + return this.distributedSystemId; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_REQUEST; + } + + @Override + public String toString() { + return "RemoteLocatorRequest{dsName=" + distributedSystemId + "}"; + } + @Override + public Version[] getSerializationVersions() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java new file mode 100644 index 0000000..a7ff6fd --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/RemoteLocatorResponse.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + * + */ +public class RemoteLocatorResponse implements DataSerializableFixedID{ + + private Set<String> locators ; + + /** Used by DataSerializer */ + public RemoteLocatorResponse() { + super(); + } + + public RemoteLocatorResponse(Set<String> locators) { + this.locators = locators; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locators = DataSerializer.readObject(in); + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeObject(this.locators, out); + } + + public Set<String> getLocators() { + return this.locators; + } + + @Override + public String toString() { + return "RemoteLocatorResponse{locators=" + locators +"}"; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java new file mode 100644 index 0000000..2844594 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WANFactoryImpl.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.DSFIDFactory; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverFactoryImpl; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderFactoryImpl; +import com.gemstone.gemfire.internal.cache.wan.spi.WANFactory; + +public class WANFactoryImpl implements WANFactory { + + @Override + public void initialize() { + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST, + RemoteLocatorJoinRequest.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE, + RemoteLocatorJoinResponse.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_REQUEST, + RemoteLocatorRequest.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.LOCATOR_JOIN_MESSAGE, + LocatorJoinMessage.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST, + RemoteLocatorPingRequest.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE, + RemoteLocatorPingResponse.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE, + RemoteLocatorResponse.class); + } + + @Override + public GatewaySenderFactory createGatewaySenderFactory(Cache cache) { + return new GatewaySenderFactoryImpl(cache); + } + + @Override + public GatewayReceiverFactory createGatewayReceiverFactory(Cache cache) { + return new GatewayReceiverFactoryImpl(cache); + } + + @Override + public WanLocatorDiscoverer createLocatorDiscoverer() { + return new WanLocatorDiscovererImpl(); + } + + @Override + public LocatorMembershipListener createLocatorMembershipListener() { + return new LocatorMembershipListenerImpl(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java new file mode 100644 index 0000000..1d44e65 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; +import org.apache.logging.log4j.Logger; + +import java.util.StringTokenizer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{ + + private static final Logger logger = LogService.getLogger(); + + private volatile boolean stopped = false; + + private ExecutorService _executor; + + public WanLocatorDiscovererImpl() { + + } + + @Override + public void discover(int port, + DistributionConfigImpl config, + LocatorMembershipListener locatorListener, + final String hostnameForClients) { + final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup + .createThreadGroup("WAN Locator Discovery Logger Group", logger); + + final ThreadFactory threadFactory = new ThreadFactory() { + public Thread newThread(final Runnable task) { + final Thread thread = new Thread(loggingThreadGroup, task, + "WAN Locator Discovery Thread"); + thread.setDaemon(true); + return thread; + } + }; + + this._executor = Executors.newCachedThreadPool(threadFactory); + exchangeLocalLocators(port, config, locatorListener, hostnameForClients); + exchangeRemoteLocators(port, config, locatorListener, hostnameForClients); + this._executor.shutdown(); + } + + @Override + public void stop() { + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + /** + * For WAN 70 Exchange the locator information within the distributed system + * + * @param config + * @param hostnameForClients + */ + private void exchangeLocalLocators(int port, + DistributionConfigImpl config, + LocatorMembershipListener locatorListener, + final String hostnameForClients) { + String localLocator = config.getStartLocator(); + DistributionLocatorId locatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + locatorId = new DistributionLocatorId(port, config.getBindAddress(), hostnameForClients); + } + else { + locatorId = new DistributionLocatorId(localLocator); + } + LocatorHelper.addLocator(config.getDistributedSystemId(), locatorId, locatorListener, null); + + RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, hostnameForClients); + StringTokenizer locatorsOnThisVM = new StringTokenizer( + config.getLocators(), ","); + while (locatorsOnThisVM.hasMoreTokens()) { + DistributionLocatorId localLocatorId = new DistributionLocatorId( + locatorsOnThisVM.nextToken()); + if (!locatorId.equals(localLocatorId)) { + LocatorDiscovery localDiscovery = new LocatorDiscovery(this, localLocatorId, request, locatorListener); + LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery(); + this._executor.execute(localLocatorDiscovery); + } + } + } + + /** + * For WAN 70 Exchange the locator information across the distributed systems + * (sites) + * + * @param config + * @param hostnameForClients + */ + private void exchangeRemoteLocators(int port, + DistributionConfigImpl config, + LocatorMembershipListener locatorListener, + final String hostnameForClients) { + RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, hostnameForClients); + String remoteDistributedSystems = config.getRemoteLocators(); + if (remoteDistributedSystems.length() > 0) { + StringTokenizer remoteLocators = new StringTokenizer( + remoteDistributedSystems, ","); + while (remoteLocators.hasMoreTokens()) { + DistributionLocatorId remoteLocatorId = new DistributionLocatorId( + remoteLocators.nextToken()); + LocatorDiscovery localDiscovery = new LocatorDiscovery(this, remoteLocatorId, + request, locatorListener); + LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery(); + this._executor.execute(remoteLocatorDiscovery); + } + } + } + + private RemoteLocatorJoinRequest buildRemoteDSJoinRequest(int port, + DistributionConfigImpl config, + final String hostnameForClients) { + String localLocator = config.getStartLocator(); + DistributionLocatorId locatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + locatorId = new DistributionLocatorId(port, config.getBindAddress(), hostnameForClients); + } + else { + locatorId = new DistributionLocatorId(localLocator); + } + RemoteLocatorJoinRequest request = new RemoteLocatorJoinRequest( + config.getDistributedSystemId(), locatorId, ""); + return request; + } + +}
