GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/main/java/org/apache/geode)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/701c6861 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/701c6861 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/701c6861 Branch: refs/heads/feature/GEODE-37_2 Commit: 701c686150bbeaa8323937fc18695c245baf2708 Parents: 9b1ec72 Author: Hitesh Khamesra <[email protected]> Authored: Tue Sep 13 15:43:20 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Tue Sep 13 15:43:20 2016 -0700 ---------------------------------------------------------------------- .../client/internal/GatewaySenderBatchOp.java | 312 -------- .../cache/client/internal/SenderProxy.java | 42 - .../internal/locator/wan/LocatorDiscovery.java | 234 ------ .../internal/locator/wan/LocatorHelper.java | 142 ---- .../locator/wan/LocatorJoinMessage.java | 105 --- .../wan/LocatorMembershipListenerImpl.java | 213 ----- .../locator/wan/RemoteLocatorJoinRequest.java | 84 -- .../locator/wan/RemoteLocatorJoinResponse.java | 86 -- .../locator/wan/RemoteLocatorPingRequest.java | 55 -- .../locator/wan/RemoteLocatorPingResponse.java | 54 -- .../locator/wan/RemoteLocatorRequest.java | 63 -- .../locator/wan/RemoteLocatorResponse.java | 71 -- .../internal/locator/wan/WANFactoryImpl.java | 74 -- .../locator/wan/WanLocatorDiscovererImpl.java | 154 ---- .../cache/wan/AbstractRemoteGatewaySender.java | 166 ---- .../cache/wan/GatewayReceiverFactoryImpl.java | 145 ---- .../internal/cache/wan/GatewayReceiverImpl.java | 251 ------ .../wan/GatewaySenderEventRemoteDispatcher.java | 802 ------------------- .../cache/wan/GatewaySenderFactoryImpl.java | 382 --------- .../wan/parallel/ParallelGatewaySenderImpl.java | 263 ------ ...rentParallelGatewaySenderEventProcessor.java | 66 -- ...moteParallelGatewaySenderEventProcessor.java | 122 --- ...urrentSerialGatewaySenderEventProcessor.java | 45 -- ...RemoteSerialGatewaySenderEventProcessor.java | 50 -- .../wan/serial/SerialGatewaySenderImpl.java | 256 ------ .../client/internal/GatewaySenderBatchOp.java | 312 ++++++++ .../cache/client/internal/SenderProxy.java | 42 + .../internal/locator/wan/LocatorDiscovery.java | 234 ++++++ .../internal/locator/wan/LocatorHelper.java | 142 ++++ .../locator/wan/LocatorJoinMessage.java | 105 +++ .../wan/LocatorMembershipListenerImpl.java | 213 +++++ .../locator/wan/RemoteLocatorJoinRequest.java | 84 ++ .../locator/wan/RemoteLocatorJoinResponse.java | 86 ++ .../locator/wan/RemoteLocatorPingRequest.java | 55 ++ .../locator/wan/RemoteLocatorPingResponse.java | 54 ++ .../locator/wan/RemoteLocatorRequest.java | 63 ++ .../locator/wan/RemoteLocatorResponse.java | 71 ++ .../internal/locator/wan/WANFactoryImpl.java | 74 ++ .../locator/wan/WanLocatorDiscovererImpl.java | 154 ++++ .../cache/wan/AbstractRemoteGatewaySender.java | 166 ++++ .../cache/wan/GatewayReceiverFactoryImpl.java | 145 ++++ .../internal/cache/wan/GatewayReceiverImpl.java | 251 ++++++ .../wan/GatewaySenderEventRemoteDispatcher.java | 802 +++++++++++++++++++ .../cache/wan/GatewaySenderFactoryImpl.java | 382 +++++++++ .../wan/parallel/ParallelGatewaySenderImpl.java | 263 ++++++ ...rentParallelGatewaySenderEventProcessor.java | 66 ++ ...moteParallelGatewaySenderEventProcessor.java | 122 +++ ...urrentSerialGatewaySenderEventProcessor.java | 45 ++ ...RemoteSerialGatewaySenderEventProcessor.java | 50 ++ .../wan/serial/SerialGatewaySenderImpl.java | 256 ++++++ 50 files changed, 4237 insertions(+), 4237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java deleted file mode 100755 index b042da0..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal; - -import com.gemstone.gemfire.InternalGemFireError; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.cache.EventID; -import com.gemstone.gemfire.internal.cache.tier.MessageType; -import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage; -import com.gemstone.gemfire.internal.cache.tier.sockets.Message; -import com.gemstone.gemfire.internal.cache.tier.sockets.Part; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; - -import java.net.SocketTimeoutException; -import java.util.Iterator; -import java.util.List; - -import org.apache.logging.log4j.Logger; - -@SuppressWarnings("unchecked") -public class GatewaySenderBatchOp { - - private static final Logger logger = LogService.getLogger(); - - /** - * Send a list of gateway events to a server to execute - * using connections from the given pool - * to communicate with the server. - * @param con the connection to send the message on. - * @param pool the pool to use to communicate with the server. - * @param events list of gateway events - * @param batchId the ID of this batch - */ - public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean isRetry) - { - AbstractOp op = null; - //System.out.println("Version: "+con.getWanSiteVersion()); - //Is this check even needed anymore? It looks like we just create the same exact op impl with the same parameters... - if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) { - op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); - } else { - // Default should create a batch of server version (ACCEPTOR.VERSION) - op = new GatewaySenderGFEBatchOpImpl(events, batchId, con.getDistributedSystemId(), isRetry); - } - pool.executeOn(con, op, true/*timeoutFatal*/); - } - - - public static Object executeOn(Connection con, ExecutablePool pool) - { - AbstractOp op = new GatewaySenderGFEBatchOpImpl(); - return pool.executeOn(con, op, true/*timeoutFatal*/); - } - - private GatewaySenderBatchOp() { - // no instances allowed - } - - static class GatewaySenderGFEBatchOpImpl extends AbstractOp { - - /** - * @throws com.gemstone.gemfire.SerializationException if serialization fails - */ - public GatewaySenderGFEBatchOpImpl(List events, int batchId, int dsId, boolean isRetry) { - super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events)); - boolean removeFromQueueOnException = true; - if (isRetry) { - getMessage().setIsRetry(); - } - getMessage().addIntPart(events.size()); - getMessage().addIntPart(batchId); - getMessage().addIntPart(dsId); - getMessage().addBytesPart( - new byte[] { removeFromQueueOnException ? (byte)1 : (byte)0 }); - // Add each event - for (Iterator i = events.iterator(); i.hasNext();) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); - // Add action - int action = event.getAction(); - getMessage().addIntPart(action); - { // Add posDup flag - byte posDupByte = (byte)(event.getPossibleDuplicate()?0x01:0x00); - getMessage().addBytesPart(new byte[] {posDupByte}); - } - if (action >= 0 && action <= 3) { - // 0 = create - // 1 = update - // 2 = destroy - String regionName = event.getRegionPath(); - EventID eventId = event.getEventId(); - Object key = event.getKey(); - Object callbackArg = event.getSenderCallbackArgument(); - - // Add region name - getMessage().addStringPart(regionName); - // Add event id - getMessage().addObjPart(eventId); - // Add key - getMessage().addStringOrObjPart(key); - if (action < 2 /* it is 0 or 1 */) { - byte[] value = event.getSerializedValue(); - byte valueIsObject = event.getValueIsObject();; - // Add value (which is already a serialized byte[]) - getMessage().addRawPart(value, (valueIsObject == 0x01)); - } - // Add callback arg if necessary - if (callbackArg == null) { - getMessage().addBytesPart(new byte[] {0x00}); - } else { - getMessage().addBytesPart(new byte[] {0x01}); - getMessage().addObjPart(callbackArg); - } - getMessage().addLongPart(event.getVersionTimeStamp()); - } - } - } - - public GatewaySenderGFEBatchOpImpl() { - super(MessageType.GATEWAY_RECEIVER_COMMAND, 0); - } - - @Override - public Object attempt(Connection cnx) throws Exception { - if (getMessage().getNumberOfParts() == 0) { - return attemptRead(cnx); - } - this.failed = true; - this.timedOut = false; - long start = startAttempt(cnx.getStats()); - try { - try { - attemptSend(cnx); - this.failed = false; - } finally { - endSendAttempt(cnx.getStats(), start); - } - } finally { - endAttempt(cnx.getStats(), start); - } - return this.failed; - } - - private Object attemptRead(Connection cnx) throws Exception { - this.failed = true; - try { - Object result = attemptReadResponse(cnx); - this.failed = false; - return result; - } catch (SocketTimeoutException ste) { - this.failed = false; - this.timedOut = true; - throw ste; - } catch (Exception e) { - throw e; - } - } - - - /** - * Attempts to read a response to this operation by reading it from the - * given connection, and returning it. - * @param cnx the connection to read the response from - * @return the result of the operation - * or <code>null</code> if the operation has no result. - * @throws Exception if the execute failed - */ - protected Object attemptReadResponse(Connection cnx) throws Exception { - Message msg = createResponseMessage(); - if (msg != null) { - msg.setComms(cnx.getSocket(), cnx.getInputStream(), - cnx.getOutputStream(), - ((ConnectionImpl)cnx).getCommBufferForAsyncRead(), cnx.getStats()); - if (msg instanceof ChunkedMessage) { - try { - return processResponse(msg, cnx); - } finally { - msg.unsetComms(); - // TODO (ashetkar) Handle the case when we fail to read the - // connection id. - processSecureBytes(cnx, msg); - } - } - - try { - msg.recv(); - } finally { - msg.unsetComms(); - processSecureBytes(cnx, msg); - } - return processResponse(msg, cnx); - } - - return null; - } - - - private static int calcPartCount(List events) { - int numberOfParts = 4; // for the number of events and the batchId - for (Iterator i = events.iterator(); i.hasNext();) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); - numberOfParts += event.getNumberOfParts(); - } - return numberOfParts; - } - - @Override - protected void processSecureBytes(Connection cnx, Message message) - throws Exception { - } - - @Override - protected boolean needsUserId() { - return false; - } - - @Override - protected void sendMessage(Connection cnx) throws Exception { - getMessage().clearMessageHasSecurePartFlag(); - getMessage().send(false); - } - - @Override - protected Object processResponse(Message msg) throws Exception { - GatewayAck ack = null; - try { - // Read the header which describes the type of message following - switch (msg.getMessageType()) { - case MessageType.REPLY: - // Read the chunk - Part part0 = msg.getPart(0); - if (part0.isBytes() && part0.getLength() == 1 && part0.getSerializedForm()[0] == 0) { - // REPLY_OKAY from a CloseConnection - break; - } - int batchId = part0.getInt(); - int numEvents = msg.getPart(1).getInt(); - ack = new GatewayAck(batchId, numEvents); - break; - case MessageType.EXCEPTION: - part0 = msg.getPart(0); - - Object obj = part0.getObject(); - if (obj instanceof List) { - List<BatchException70> l = (List<BatchException70>)part0.getObject(); - - // if (logger.isDebugEnabled()) { - logger.info("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj); - //} - // don't throw Exception but set it in the Ack - BatchException70 be = new BatchException70(l); - ack = new GatewayAck(be, l.get(0).getBatchId()); - - } else if (obj instanceof Throwable) { - String s = ": While reading Ack from receiver " - + ((Throwable)obj).getMessage(); - throw new ServerOperationException(s, (Throwable)obj); - } - break; - default: - throw new InternalGemFireError( - LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0 - .toLocalizedString(Integer.valueOf(msg.getMessageType()))); - } - } finally { - msg.clear(); - } - return ack; - } - - @Override - protected boolean isErrorResponse(int msgType) { - return false; - } - @Override - protected long startAttempt(ConnectionStats stats) { - return stats.startGatewayBatch(); - } - @Override - protected void endSendAttempt(ConnectionStats stats, long start) { - stats.endGatewayBatchSend(start, hasFailed()); - } - @Override - protected void endAttempt(ConnectionStats stats, long start) { - stats.endGatewayBatch(start, hasTimedOut(), hasFailed()); - } - - @Override - public boolean isGatewaySenderOp() { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java deleted file mode 100644 index 8647d12..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal; - -import java.util.List; - -import com.gemstone.gemfire.cache.query.SelectResults; -import com.gemstone.gemfire.distributed.internal.ServerLocation; - -/** - * Used to send operations from a sender to a receiver. - * @since GemFire 8.1 - */ -public class SenderProxy extends ServerProxy{ - public SenderProxy(InternalPool pool) { - super(pool); - } - - public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean isRetry) - { - GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, isRetry); - } - - public Object receiveAckFromReceiver(Connection con) - { - return GatewaySenderBatchOp.executeOn(con, this.pool); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java deleted file mode 100644 index 60973ed..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; -import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.internal.net.*; -import com.gemstone.gemfire.internal.tcp.ConnectionException; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; - -/** - * This class represent a runnable task which exchange the locator information - * with local locators(within the site) as well as remote locators (across the - * site) - * - * @since GemFire 7.0 - */ -public class LocatorDiscovery{ - - private static final Logger logger = LogService.getLogger(); - - private WanLocatorDiscoverer discoverer; - - private DistributionLocatorId locatorId; - - private LocatorMembershipListener locatorListener; - - RemoteLocatorJoinRequest request; - - TcpClient locatorClient; - - public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT = Integer - .getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue(); - - public static final int WAN_LOCATOR_CONNECTION_INTERVAL = Integer.getInteger( - "WANLocator.CONNECTION_INTERVAL", 10000).intValue(); - - public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger( - "WANLocator.PING_INTERVAL", 10000).intValue(); - - public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,RemoteLocatorJoinRequest request, - LocatorMembershipListener locatorListener) { - this.discoverer = discoverer; - this.locatorId = locator; - this.request = request; - this.locatorListener = locatorListener; - this.locatorClient = new TcpClient(); - } - - /** - * When a batch fails, then this keeps the last time when a failure was logged - * . We don't want to swamp the logs in retries due to same batch failures. - */ - private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval = new ConcurrentHashMap<DistributionLocatorId, long[]>(); - - /** - * The maximum size of {@link #failureLogInterval} beyond which it will start - * logging all failure instances. Hopefully this should never happen in - * practice. - */ - private static final int FAILURE_MAP_MAXSIZE = Integer.getInteger( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.FAILURE_MAP_MAXSIZE", 1000000); - - /** - * The maximum interval for logging failures of the same event in millis. - */ - private static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger( - DistributionConfig.GEMFIRE_PREFIX + "LocatorDiscovery.FAILURE_LOG_MAX_INTERVAL", 300000); - - public final boolean skipFailureLogging(DistributionLocatorId locatorId) { - boolean skipLogging = false; - if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) { - long[] logInterval = this.failureLogInterval.get(locatorId); - if (logInterval == null) { - logInterval = this.failureLogInterval.putIfAbsent(locatorId, - new long[] { System.currentTimeMillis(), 1000 }); - } - if (logInterval != null) { - long currentTime = System.currentTimeMillis(); - if ((currentTime - logInterval[0]) < logInterval[1]) { - skipLogging = true; - } else { - logInterval[0] = currentTime; - if (logInterval[1] <= (FAILURE_LOG_MAX_INTERVAL / 2)) { - logInterval[1] *= 2; - } - } - } - } - return skipLogging; - } - - - public class LocalLocatorDiscovery implements Runnable { - public void run() { - exchangeLocalLocators(); - } - } - - public class RemoteLocatorDiscovery implements Runnable { - public void run() { - exchangeRemoteLocators(); - } - } - - private WanLocatorDiscoverer getDiscoverer() { - return this.discoverer; - } - - private void exchangeLocalLocators() { - int retryAttempt = 1; - while (!getDiscoverer().isStopped()) { - try { - RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)locatorClient - .requestToServer(locatorId.getHost(), locatorId.getPort(), request, - WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); - if (response != null) { - LocatorHelper.addExchangedLocators(response.getLocators(), - this.locatorListener); - logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, - new Object[] { request.getLocator(), locatorId, response.getLocators() })); - break; - } - } - catch (IOException ioe) { - if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { - ConnectionException coe = new ConnectionException( - "Not able to connect to local locator after " - + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", - ioe); - logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, - new Object[] { request.getLocator(),locatorId, retryAttempt }), coe); - break; - } - if (skipFailureLogging(locatorId)) { - logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, - new Object[] { request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); - } - try { - Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - retryAttempt++; - continue; - } - catch (ClassNotFoundException classNotFoundException) { - logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), classNotFoundException); - break; - } - } - } - - public void exchangeRemoteLocators() { - int retryAttempt = 1; - DistributionLocatorId remoteLocator = this.locatorId; - while (!getDiscoverer().isStopped()) { - RemoteLocatorJoinResponse response; - try { - response = (RemoteLocatorJoinResponse)locatorClient - .requestToServer(remoteLocator.getHost(), remoteLocator.getPort(), - request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); - if (response != null) { - LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener); - logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, - new Object[] { request.getLocator(), locatorId, response.getLocators() })); - RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest( - ""); - while (true) { - Thread.sleep(WAN_LOCATOR_PING_INTERVAL); - RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse)locatorClient - .requestToServer(remoteLocator.getHost(), - remoteLocator.getPort(), pingRequest, - WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); - if (pingResponse != null) { - continue; - } - break; - } - } - } - catch (IOException ioe) { - if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { - logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, - new Object[] { request.getLocator(), remoteLocator, retryAttempt}), ioe); - break; - } - if (skipFailureLogging(remoteLocator)) { - logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, - new Object[] { request.getLocator(), remoteLocator, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); - } - try { - Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - retryAttempt++; - continue; - } - catch (ClassNotFoundException classNotFoundException) { - logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), classNotFoundException); - break; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java deleted file mode 100644 index 83b6db3..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import com.gemstone.gemfire.distributed.internal.InternalLocator; -import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; -import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; -import com.gemstone.gemfire.internal.CopyOnWriteHashSet; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -/** - * This is a helper class which helps to add the locator information to the allLocatorInfoMap. - * - * - */ -public class LocatorHelper { - - public final static Object locatorObject = new Object(); - /** - * - * This methods add the given locator to allLocatorInfoMap. - * It also invokes a locatorlistener to inform other locators in allLocatorInfoMap about this newly added locator. - * @param distributedSystemId - * @param locator - * @param locatorListener - * @param sourceLocator - */ - public static boolean addLocator(int distributedSystemId, - DistributionLocatorId locator, LocatorMembershipListener locatorListener, - DistributionLocatorId sourceLocator) { - ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener - .getAllLocatorsInfo(); - Set<DistributionLocatorId> locatorsSet = new CopyOnWriteHashSet<DistributionLocatorId>(); - locatorsSet.add(locator); - Set<DistributionLocatorId> existingValue = allLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); - if(existingValue != null){ - if (!existingValue.contains(locator)) { - existingValue.add(locator); - addServerLocator(distributedSystemId, locatorListener, locator); - locatorListener.locatorJoined(distributedSystemId, locator, - sourceLocator); - } - else { - return false; - } - }else{ - addServerLocator(distributedSystemId, locatorListener, locator); - locatorListener.locatorJoined(distributedSystemId, locator, - sourceLocator); - } - return true; - } - - /** - * This methods decides whether the given locator is server locator, if so - * then add this locator in allServerLocatorsInfo map. - * - * @param distributedSystemId - * @param locatorListener - * @param locator - */ - private static void addServerLocator(Integer distributedSystemId, - LocatorMembershipListener locatorListener, DistributionLocatorId locator) { - if (!locator.isServerLocator()) { - return; - } - ConcurrentHashMap<Integer, Set<String>> allServerLocatorsInfo = (ConcurrentHashMap<Integer, Set<String>>)locatorListener - .getAllServerLocatorsInfo(); - - Set<String> locatorsSet = new CopyOnWriteHashSet<String>(); - locatorsSet.add(locator.toString()); - Set<String> existingValue = allServerLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); - if(existingValue != null){ - if (!existingValue.contains(locator.toString())) { - existingValue.add(locator.toString()); - } - } - } - - /** - * This method adds the map of locatorsinfo sent by other locator to this locator's allLocatorInfo - * - * @param locators - * @param locatorListener - */ - public static boolean addExchangedLocators(Map<Integer, Set<DistributionLocatorId>> locators, - LocatorMembershipListener locatorListener) { - - ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocators = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener - .getAllLocatorsInfo(); - if (!allLocators.equals(locators)) { - for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators - .entrySet()) { - Set<DistributionLocatorId> existingValue = allLocators.putIfAbsent( - entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(entry - .getValue())); - - if (existingValue != null) { - Set<DistributionLocatorId> localLocators = allLocators.get(entry - .getKey()); - if (!localLocators.equals(entry.getValue())) { - entry.getValue().removeAll(localLocators); - for (DistributionLocatorId locator : entry.getValue()) { - localLocators.add(locator); - addServerLocator(entry.getKey(), locatorListener, locator); - locatorListener.locatorJoined(entry.getKey(), locator, null); - } - } - - } - else { - for (DistributionLocatorId locator : entry.getValue()) { - addServerLocator(entry.getKey(), locatorListener, locator); - locatorListener.locatorJoined(entry.getKey(), locator, null); - } - } - } - return true; - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java deleted file mode 100644 index 86ae0d6..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; - -public class LocatorJoinMessage extends ServerLocationRequest { - - private DistributionLocatorId locator; - - private int distributedSystemId; - - private DistributionLocatorId sourceLocator; - - public LocatorJoinMessage() { - super(); - } - - public LocatorJoinMessage(int distributedSystemId, DistributionLocatorId locator, - DistributionLocatorId sourceLocator, String serverGroup) { - super(serverGroup); - this.locator = locator; - this.distributedSystemId = distributedSystemId; - this.sourceLocator = sourceLocator; - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - super.fromData(in); - this.locator = DataSerializer.readObject(in); - this.distributedSystemId = in.readInt(); - this.sourceLocator = DataSerializer.readObject(in); - } - - public void toData(DataOutput out) throws IOException { - super.toData(out); - DataSerializer.writeObject(locator, out); - out.writeInt(this.distributedSystemId); - DataSerializer.writeObject(sourceLocator, out); - } - - public DistributionLocatorId getLocator() { - return this.locator; - } - - public int getDistributedSystemId() { - return distributedSystemId; - } - - public DistributionLocatorId getSourceLocator() { - return sourceLocator; - } - - public int getDSFID() { - return DataSerializableFixedID.LOCATOR_JOIN_MESSAGE; - } - - @Override - public String toString() { - return "LocatorJoinMessage{distributedSystemId="+ distributedSystemId +" locators=" + locator + " Source Locator : " + sourceLocator +"}"; - } - - @Override - public boolean equals(Object obj){ - if ( this == obj ) return true; - if ( !(obj instanceof LocatorJoinMessage) ) return false; - LocatorJoinMessage myObject = (LocatorJoinMessage)obj; - if((this.distributedSystemId == myObject.getDistributedSystemId()) && this.locator.equals(myObject.getLocator())){ - return true; - } - return false; - } - - @Override - public int hashCode() { - // it is sufficient for all messages having the same locator to hash to the same bucket - if (this.locator == null) { - return 0; - } else { - return this.locator.hashCode(); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java deleted file mode 100644 index 01e01e7..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; -import com.gemstone.gemfire.internal.CopyOnWriteHashSet; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * An implementation of - * {@link com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener} - * - * - */ -public class LocatorMembershipListenerImpl implements LocatorMembershipListener { - - private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>(); - - private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo = new ConcurrentHashMap<Integer, Set<String>>(); - - private static final Logger logger = LogService.getLogger(); - - private DistributionConfig config; - - private TcpClient tcpClient; - - private int port; - - public LocatorMembershipListenerImpl() { - this.tcpClient = new TcpClient(); - } - - public void setPort(int port){ - this.port = port; - } - - public void setConfig(DistributionConfig config) { - this.config = config; - } - - /** - * When the new locator is added to remote locator metadata, inform all other - * locators in remote locator metadata about the new locator so that they can - * update their remote locator metadata. - * - * @param locator - */ - - public void locatorJoined(final int distributedSystemId, - final DistributionLocatorId locator, - final DistributionLocatorId sourceLocator) { - Thread distributeLocator = new Thread(new Runnable() { - public void run() { - ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo(); - ArrayList<DistributionLocatorId> locatorsToRemove = new ArrayList<DistributionLocatorId>(); - - String localLocator = config.getStartLocator(); - DistributionLocatorId localLocatorId = null; - if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { - localLocatorId = new DistributionLocatorId(port, config - .getBindAddress()); - } - else { - localLocatorId = new DistributionLocatorId(localLocator); - } - locatorsToRemove.add(localLocatorId); - locatorsToRemove.add(locator); - locatorsToRemove.add(sourceLocator); - - Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<Integer, Set<DistributionLocatorId>>(); - for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()){ - Set<DistributionLocatorId> value = new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue()); - localCopy.put(entry.getKey(), value); - } - for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()){ - for(DistributionLocatorId removeLocId : locatorsToRemove){ - if(entry.getValue().contains(removeLocId)){ - entry.getValue().remove(removeLocId); - } - } - for (DistributionLocatorId value : entry.getValue()) { - try { - tcpClient.requestToServer(value.getHost(), value.getPort(), - new LocatorJoinMessage(distributedSystemId, locator, localLocatorId, ""), 1000, false); - } - catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, - new Object[] { locator.getHost(), locator.getPort(), value.getHost(), value.getPort() })); - } - } - try { - tcpClient.requestToServer(locator.getHost(), locator.getPort(), - new LocatorJoinMessage(entry.getKey(), value, localLocatorId, ""), 1000, false); - } - catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, - new Object[] { value.getHost(), value.getPort(), locator.getHost(), locator.getPort() })); - } - } - } - } - } - }); - distributeLocator.setDaemon(true); - distributeLocator.start(); - } - - public Object handleRequest(Object request) { - Object response = null; - if (request instanceof RemoteLocatorJoinRequest) { - response = updateAllLocatorInfo((RemoteLocatorJoinRequest)request); - } - else if (request instanceof LocatorJoinMessage) { - response = informAboutRemoteLocators((LocatorJoinMessage)request); - } - else if (request instanceof RemoteLocatorPingRequest) { - response = getPingResponse((RemoteLocatorPingRequest)request); - } - else if (request instanceof RemoteLocatorRequest) { - response = getRemoteLocators((RemoteLocatorRequest)request); - } - return response; - } - - /** - * A locator from the request is checked against the existing remote locator - * metadata. If it is not available then added to existing remote locator - * metadata and LocatorMembershipListener is invoked to inform about the - * this newly added locator to all other locators available in remote locator - * metadata. As a response, remote locator metadata is sent. - * - * @param request - */ - private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) { - int distributedSystemId = request.getDistributedSystemId(); - DistributionLocatorId locator = request.getLocator(); - - LocatorHelper.addLocator(distributedSystemId, locator, this, null); - return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo()); - } - - private Object getPingResponse(RemoteLocatorPingRequest request) { - return new RemoteLocatorPingResponse(); - } - - private Object informAboutRemoteLocators(LocatorJoinMessage request){ - // TODO: FInd out the importance of list locatorJoinMessages. During - // refactoring I could not understand its significance -// synchronized (locatorJoinObject) { -// if (locatorJoinMessages.contains(request)) { -// return null; -// } -// locatorJoinMessages.add(request); -// } - int distributedSystemId = request.getDistributedSystemId(); - DistributionLocatorId locator = request.getLocator(); - DistributionLocatorId sourceLocatorId = request.getSourceLocator(); - - LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId); - return null; - } - - private Object getRemoteLocators(RemoteLocatorRequest request) { - int dsId = request.getDsId(); - Set<String> locators = this.getRemoteLocatorInfo(dsId); - return new RemoteLocatorResponse(locators); - } - - public Set<String> getRemoteLocatorInfo(int dsId) { - return this.allServerLocatorsInfo.get(dsId); - } - - public ConcurrentMap<Integer,Set<DistributionLocatorId>> getAllLocatorsInfo() { - return this.allLocatorsInfo; - } - - public ConcurrentMap<Integer,Set<String>> getAllServerLocatorsInfo() { - return this.allServerLocatorsInfo; - } - - public void clearLocatorInfo(){ - allLocatorsInfo.clear(); - allServerLocatorsInfo.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java deleted file mode 100644 index c058333..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; - -/** - * Requests remote locators of a remote WAN site - * - * - * @since GemFire 6.6 - * - */ -public class RemoteLocatorJoinRequest implements DataSerializableFixedID { - - private DistributionLocatorId locator = null; - - private int distributedSystemId = -1; - - public RemoteLocatorJoinRequest() { - super(); - } - - public RemoteLocatorJoinRequest(int distributedSystemId, DistributionLocatorId locator, - String serverGroup) { - this.distributedSystemId = distributedSystemId; - this.locator = locator; - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.locator = DataSerializer.readObject(in); - this.distributedSystemId = in.readInt(); - } - - public void toData(DataOutput out) throws IOException { - DataSerializer.writeObject(locator, out); - out.writeInt(this.distributedSystemId); - } - - public DistributionLocatorId getLocator() { - return this.locator; - } - - public int getDistributedSystemId() { - return distributedSystemId; - } - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST; - } - - @Override - public String toString() { - return "RemoteLocatorJoinRequest{locator=" + locator + "}"; - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java deleted file mode 100644 index 42c3bb0..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.internal.CopyOnWriteHashSet; -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; - -/** - * List of remote locators as a response - * - * - * - */ -public class RemoteLocatorJoinResponse implements DataSerializableFixedID{ - - private HashMap<Integer, Set<DistributionLocatorId>> locators = new HashMap<Integer, Set<DistributionLocatorId>>(); - - /** Used by DataSerializer */ - public RemoteLocatorJoinResponse() { - super(); - } - - public RemoteLocatorJoinResponse( - Map<Integer, Set<DistributionLocatorId>> locators) { - super(); - this.locators = new HashMap<Integer, Set<DistributionLocatorId>>(); - for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators - .entrySet()) { - this.locators.put(entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>( - entry.getValue())); - } - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.locators = DataSerializer.readHashMap(in); - - } - - public void toData(DataOutput out) throws IOException { - DataSerializer.writeHashMap(locators, out); - } - - public Map<Integer, Set<DistributionLocatorId>> getLocators() { - return this.locators; - } - - @Override - public String toString() { - return "RemoteLocatorJoinResponse{locators=" + locators + "}"; - } - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE; - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java deleted file mode 100644 index a1cb951..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; - -/** - * - * - */ - -public class RemoteLocatorPingRequest implements DataSerializableFixedID{ - - public RemoteLocatorPingRequest() { - super(); - } - - public RemoteLocatorPingRequest(String serverGroup) { - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - } - - public void toData(DataOutput out) throws IOException { - } - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST; - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java deleted file mode 100644 index 54fdc04..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; - -/** - * - */ -public class RemoteLocatorPingResponse implements DataSerializableFixedID { - - - /** Used by DataSerializer */ - public RemoteLocatorPingResponse() { - super(); - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - } - - public void toData(DataOutput out) throws IOException { - } - - - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE; - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java deleted file mode 100644 index d23ca93..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; -/** - * - * - */ -public class RemoteLocatorRequest implements DataSerializableFixedID{ - private int distributedSystemId ; - - public RemoteLocatorRequest() { - super(); - } - public RemoteLocatorRequest(int dsId, String serverGroup) { - this.distributedSystemId = dsId; - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.distributedSystemId = in.readInt(); - } - - public void toData(DataOutput out) throws IOException { - out.writeInt(this.distributedSystemId); - } - - public int getDsId() { - return this.distributedSystemId; - } - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_REQUEST; - } - - @Override - public String toString() { - return "RemoteLocatorRequest{dsName=" + distributedSystemId + "}"; - } - @Override - public Version[] getSerializationVersions() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java deleted file mode 100644 index a7ff6fd..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Set; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.Version; - -/** - * - * - */ -public class RemoteLocatorResponse implements DataSerializableFixedID{ - - private Set<String> locators ; - - /** Used by DataSerializer */ - public RemoteLocatorResponse() { - super(); - } - - public RemoteLocatorResponse(Set<String> locators) { - this.locators = locators; - } - - public void fromData(DataInput in) throws IOException, ClassNotFoundException { - this.locators = DataSerializer.readObject(in); - } - - public void toData(DataOutput out) throws IOException { - DataSerializer.writeObject(this.locators, out); - } - - public Set<String> getLocators() { - return this.locators; - } - - @Override - public String toString() { - return "RemoteLocatorResponse{locators=" + locators +"}"; - } - - public int getDSFID() { - return DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE; - } - - @Override - public Version[] getSerializationVersions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java deleted file mode 100644 index 2844594..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; -import com.gemstone.gemfire.internal.DSFIDFactory; -import com.gemstone.gemfire.internal.DataSerializableFixedID; -import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverFactoryImpl; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderFactoryImpl; -import com.gemstone.gemfire.internal.cache.wan.spi.WANFactory; - -public class WANFactoryImpl implements WANFactory { - - @Override - public void initialize() { - DSFIDFactory.registerDSFID( - DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST, - RemoteLocatorJoinRequest.class); - DSFIDFactory.registerDSFID( - DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE, - RemoteLocatorJoinResponse.class); - DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_REQUEST, - RemoteLocatorRequest.class); - DSFIDFactory.registerDSFID(DataSerializableFixedID.LOCATOR_JOIN_MESSAGE, - LocatorJoinMessage.class); - DSFIDFactory.registerDSFID( - DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST, - RemoteLocatorPingRequest.class); - DSFIDFactory.registerDSFID( - DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE, - RemoteLocatorPingResponse.class); - DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE, - RemoteLocatorResponse.class); - } - - @Override - public GatewaySenderFactory createGatewaySenderFactory(Cache cache) { - return new GatewaySenderFactoryImpl(cache); - } - - @Override - public GatewayReceiverFactory createGatewayReceiverFactory(Cache cache) { - return new GatewayReceiverFactoryImpl(cache); - } - - @Override - public WanLocatorDiscoverer createLocatorDiscoverer() { - return new WanLocatorDiscovererImpl(); - } - - @Override - public LocatorMembershipListener createLocatorMembershipListener() { - return new LocatorMembershipListenerImpl(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java deleted file mode 100644 index 1d44e65..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.client.internal.locator.wan; - -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; -import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; -import org.apache.logging.log4j.Logger; - -import java.util.StringTokenizer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - -public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{ - - private static final Logger logger = LogService.getLogger(); - - private volatile boolean stopped = false; - - private ExecutorService _executor; - - public WanLocatorDiscovererImpl() { - - } - - @Override - public void discover(int port, - DistributionConfigImpl config, - LocatorMembershipListener locatorListener, - final String hostnameForClients) { - final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup - .createThreadGroup("WAN Locator Discovery Logger Group", logger); - - final ThreadFactory threadFactory = new ThreadFactory() { - public Thread newThread(final Runnable task) { - final Thread thread = new Thread(loggingThreadGroup, task, - "WAN Locator Discovery Thread"); - thread.setDaemon(true); - return thread; - } - }; - - this._executor = Executors.newCachedThreadPool(threadFactory); - exchangeLocalLocators(port, config, locatorListener, hostnameForClients); - exchangeRemoteLocators(port, config, locatorListener, hostnameForClients); - this._executor.shutdown(); - } - - @Override - public void stop() { - this.stopped = true; - } - - @Override - public boolean isStopped() { - return this.stopped; - } - - /** - * For WAN 70 Exchange the locator information within the distributed system - * - * @param config - * @param hostnameForClients - */ - private void exchangeLocalLocators(int port, - DistributionConfigImpl config, - LocatorMembershipListener locatorListener, - final String hostnameForClients) { - String localLocator = config.getStartLocator(); - DistributionLocatorId locatorId = null; - if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { - locatorId = new DistributionLocatorId(port, config.getBindAddress(), hostnameForClients); - } - else { - locatorId = new DistributionLocatorId(localLocator); - } - LocatorHelper.addLocator(config.getDistributedSystemId(), locatorId, locatorListener, null); - - RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, hostnameForClients); - StringTokenizer locatorsOnThisVM = new StringTokenizer( - config.getLocators(), ","); - while (locatorsOnThisVM.hasMoreTokens()) { - DistributionLocatorId localLocatorId = new DistributionLocatorId( - locatorsOnThisVM.nextToken()); - if (!locatorId.equals(localLocatorId)) { - LocatorDiscovery localDiscovery = new LocatorDiscovery(this, localLocatorId, request, locatorListener); - LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery(); - this._executor.execute(localLocatorDiscovery); - } - } - } - - /** - * For WAN 70 Exchange the locator information across the distributed systems - * (sites) - * - * @param config - * @param hostnameForClients - */ - private void exchangeRemoteLocators(int port, - DistributionConfigImpl config, - LocatorMembershipListener locatorListener, - final String hostnameForClients) { - RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config, hostnameForClients); - String remoteDistributedSystems = config.getRemoteLocators(); - if (remoteDistributedSystems.length() > 0) { - StringTokenizer remoteLocators = new StringTokenizer( - remoteDistributedSystems, ","); - while (remoteLocators.hasMoreTokens()) { - DistributionLocatorId remoteLocatorId = new DistributionLocatorId( - remoteLocators.nextToken()); - LocatorDiscovery localDiscovery = new LocatorDiscovery(this, remoteLocatorId, - request, locatorListener); - LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery(); - this._executor.execute(remoteLocatorDiscovery); - } - } - } - - private RemoteLocatorJoinRequest buildRemoteDSJoinRequest(int port, - DistributionConfigImpl config, - final String hostnameForClients) { - String localLocator = config.getStartLocator(); - DistributionLocatorId locatorId = null; - if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { - locatorId = new DistributionLocatorId(port, config.getBindAddress(), hostnameForClients); - } - else { - locatorId = new DistributionLocatorId(localLocator); - } - RemoteLocatorJoinRequest request = new RemoteLocatorJoinRequest( - config.getDistributedSystemId(), locatorId, ""); - return request; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java deleted file mode 100644 index 163a611..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.client.PoolManager; -import com.gemstone.gemfire.cache.client.internal.PoolImpl; -import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest; -import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; -import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.Iterator; -import java.util.StringTokenizer; - -public abstract class AbstractRemoteGatewaySender extends AbstractGatewaySender { - private static final Logger logger = LogService.getLogger(); - - public AbstractRemoteGatewaySender() { - - } - public AbstractRemoteGatewaySender(Cache cache, GatewaySenderAttributes attrs){ - super(cache, attrs); - } - - /** used to reduce warning logs in case remote locator is down (#47634) */ - protected int proxyFailureTries = 0; - - public synchronized void initProxy() { - // return if it is being used for WBCL or proxy is already created - if (this.remoteDSId == DEFAULT_DISTRIBUTED_SYSTEM_ID || this.proxy != null - && !this.proxy.isDestroyed()) { - return; - } - - int locatorCount = 0; - PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory(); - pf.setPRSingleHopEnabled(false); - if (this.locatorDiscoveryCallback != null) { - pf.setLocatorDiscoveryCallback(locatorDiscoveryCallback); - } - pf.setReadTimeout(this.socketReadTimeout); - pf.setIdleTimeout(connectionIdleTimeOut); - pf.setSocketBufferSize(socketBufferSize); - pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP); - RemoteLocatorRequest request = new RemoteLocatorRequest(this.remoteDSId, pf - .getPoolAttributes().getServerGroup()); - String locators = ((GemFireCacheImpl) this.cache).getDistributedSystem() - .getConfig().getLocators(); - if (logger.isDebugEnabled()) { - logger.debug("Gateway Sender is attempting to configure pool with remote locator information"); - } - StringTokenizer locatorsOnThisVM = new StringTokenizer(locators, ","); - while (locatorsOnThisVM.hasMoreTokens()) { - String localLocator = locatorsOnThisVM.nextToken(); - DistributionLocatorId locatorID = new DistributionLocatorId(localLocator); - try { - RemoteLocatorResponse response = (RemoteLocatorResponse) new TcpClient() - .requestToServer(locatorID.getHost(), locatorID.getPort(), request, - WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); - - if (response != null) { - if (response.getLocators() == null) { - if (logProxyFailure()) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.AbstractGatewaySender_REMOTE_LOCATOR_FOR_REMOTE_SITE_0_IS_NOT_AVAILABLE_IN_LOCAL_LOCATOR_1, - new Object[] { remoteDSId, localLocator })); - } - continue; - } - if (logger.isDebugEnabled()) { - logger.debug("Received the remote site {} location information:", this.remoteDSId, response.getLocators()); - } - StringBuffer strBuffer = new StringBuffer(); - Iterator<String> itr = response.getLocators().iterator(); - while (itr.hasNext()) { - DistributionLocatorId locatorId = new DistributionLocatorId(itr.next()); - pf.addLocator(locatorId.getHost().getHostName(), locatorId.getPort()); - locatorCount++; - } - break; - } - } catch (IOException ioe) { - if (logProxyFailure()) { - // don't print stack trace for connection failures - String ioeStr = ""; - if (!logger.isDebugEnabled() && ioe instanceof ConnectException) { - ioeStr = ": " + ioe.toString(); - ioe = null; - } - logger.warn(LocalizedMessage.create( - LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, - new Object[] { this.id, localLocator + ioeStr }), ioe); - } - continue; - } catch (ClassNotFoundException e) { - if (logProxyFailure()) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, - new Object[] { this.id, localLocator }), e); - } - continue; - } - } - - if (locatorCount == 0) { - if (logProxyFailure()) { - logger.fatal(LocalizedMessage.create( - LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, - new Object[] { this.id, this.remoteDSId })); - } - this.proxyFailureTries++; - throw new GatewaySenderConfigurationException( - LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1 - .toLocalizedString(new Object[] { this.id, this.remoteDSId})); - } - pf.init(this); - this.proxy = ((PoolImpl) pf.create(this.getId())); - if (this.proxyFailureTries > 0) { - logger.info(LocalizedMessage.create(LocalizedStrings.AbstractGatewaySender_SENDER_0_GOT_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, - new Object[] { this.id, this.remoteDSId, this.proxyFailureTries })); - this.proxyFailureTries = 0; - } - } - - protected boolean logProxyFailure() { - assert Thread.holdsLock(this); - // always log the first failure - if (logger.isDebugEnabled() || this.proxyFailureTries == 0) { - return true; - } else { - // subsequent failures will be logged on 30th, 300th, 3000th try - // each try is at 100millis from higher layer so this accounts for logging - // after 3s, 30s and then every 5mins - if (this.proxyFailureTries >= 3000) { - return (this.proxyFailureTries % 3000) == 0; - } else { - return (this.proxyFailureTries == 30 || this.proxyFailureTries == 300); - } - } - } -}
