http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java deleted file mode 100644 index c5c2a2f..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ResourceEvent; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; - -/** - * - * @since GemFire 7.0 - */ -public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory { - - private int startPort = GatewayReceiver.DEFAULT_START_PORT; - - private int endPort = GatewayReceiver.DEFAULT_END_PORT; - - private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS; - - private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE; - - private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; - - private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS; - - private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START; - - private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>(); - - private Cache cache; - - public GatewayReceiverFactoryImpl() { - - } - public GatewayReceiverFactoryImpl(Cache cache) { - this.cache = cache; - } - - public GatewayReceiverFactory addGatewayTransportFilter( - GatewayTransportFilter filter) { - this.filters.add(filter); - return this; - } - - public GatewayReceiverFactory removeGatewayTransportFilter( - GatewayTransportFilter filter) { - this.filters.remove(filter); - return this; - } - - public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) { - this.timeBetPings = time; - return this; - } - - public GatewayReceiverFactory setStartPort(int port) { - this.startPort = port; - return this; - } - - public GatewayReceiverFactory setEndPort(int port) { - this.endPort = port; - return this; - } - - public GatewayReceiverFactory setSocketBufferSize(int size) { - this.socketBuffSize = size; - return this; - } - - public GatewayReceiverFactory setBindAddress(String address) { - this.bindAdd = address; - return this; - } - - public GatewayReceiverFactory setHostnameForSenders(String address) { - this.hostnameForSenders = address; - return this; - } - - public GatewayReceiverFactory setManualStart(boolean start) { - this.manualStart = start; - return this; - } - - public GatewayReceiver create() { - if (this.startPort > this.endPort) { - throw new IllegalStateException( - "Please specify either start port a value which is less than end port."); - } - GatewayReceiver recv = null; - if (this.cache instanceof GemFireCacheImpl) { - recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, - this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, - this.hostnameForSenders, this.manualStart); - ((GemFireCacheImpl)cache).addGatewayReceiver(recv); - InternalDistributedSystem system = (InternalDistributedSystem) this.cache - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv); - if (!this.manualStart) { - try { - recv.start(); - } - catch (IOException ioe) { - throw new GatewayReceiverException( - LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER - .toLocalizedString(), ioe); - } - } - } else if (this.cache instanceof CacheCreation) { - recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort, - this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, - this.hostnameForSenders, this.manualStart); - ((CacheCreation)cache).addGatewayReceiver(recv); - } - return recv; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java deleted file mode 100644 index b8768d4..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import java.io.IOException; -import java.net.BindException; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.List; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ResourceEvent; -import com.gemstone.gemfire.internal.AvailablePort; -import com.gemstone.gemfire.internal.net.SocketCreator; -import com.gemstone.gemfire.internal.cache.CacheServerImpl; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * @since GemFire 7.0 - */ -@SuppressWarnings("deprecation") -public class GatewayReceiverImpl implements GatewayReceiver { - - private static final Logger logger = LogService.getLogger(); - - private String host; - - private int startPort; - - private int endPort; - - private int port; - - private int timeBetPings; - - private int socketBufferSize; - - private boolean manualStart; - - private final List<GatewayTransportFilter> filters; - - private String bindAdd; - - private CacheServer receiver; - - private final GemFireCacheImpl cache; - - public GatewayReceiverImpl(Cache cache, int startPort, - int endPort, int timeBetPings, int buffSize, String bindAdd, - List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) { - this.cache = (GemFireCacheImpl)cache; - - /* - * If user has set hostNameForSenders then it should take precedence over - * bindAddress. If user hasn't set either hostNameForSenders or bindAddress - * then getLocalHost().getHostName() should be used. - */ - if (hostnameForSenders == null || hostnameForSenders.isEmpty()) { - if (bindAdd == null || bindAdd.isEmpty()) { - try { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST)); - this.host = SocketCreator.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new IllegalStateException( - LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME - .toLocalizedString(), - e); - } - } else { - this.host = bindAdd; - } - } else { - this.host = hostnameForSenders; - } - - this.startPort = startPort; - this.endPort = endPort; - this.timeBetPings = timeBetPings; - this.socketBufferSize = buffSize; - this.bindAdd = bindAdd; - this.filters = filters; - this.manualStart = manualStart; - } - - public List<GatewayTransportFilter> getGatewayTransportFilters() { - return this.filters; - } - - public int getMaximumTimeBetweenPings() { - return this.timeBetPings; - } - - public int getPort() { - return this.port; - } - - public int getStartPort() { - return this.startPort; - } - - public int getEndPort() { - return this.endPort; - } - - public int getSocketBufferSize() { - return this.socketBufferSize; - } - - public boolean isManualStart() { - return this.manualStart; - } - - public CacheServer getServer() { - return receiver; - } - - public void start() throws IOException { - if (receiver == null) { - receiver = this.cache.addCacheServer(true); - } - if (receiver.isRunning()) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING)); - return; - } - boolean started = false; - this.port = getPortToStart(); - while (!started && this.port != -1) { - receiver.setPort(this.port); - receiver.setSocketBufferSize(socketBufferSize); - receiver.setMaximumTimeBetweenPings(timeBetPings); - receiver.setHostnameForClients(host); - receiver.setBindAddress(bindAdd); - receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP }); - ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters); - try { - ((CacheServerImpl)receiver).start(); - started = true; - } catch (BindException be) { - if (be.getCause() != null - && be.getCause().getMessage() - .contains("assign requested address")) { - throw new GatewayReceiverException( - LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1 - .toLocalizedString(new Object[] { bindAdd, - Integer.valueOf(this.port) })); - } - // ignore as this port might have been used by other threads. - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); - this.port = getPortToStart(); - } catch (SocketException se) { - if (se.getMessage().contains("Address already in use")) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); - this.port = getPortToStart(); - - } else { - throw se; - } - } - - } - if (!started) { - throw new IllegalStateException( - "No available free port found in the given range."); - } - logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port)); - - InternalDistributedSystem system = this.cache.getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this); - - } - - private int getPortToStart(){ - // choose a random port from the given port range - int rPort; - if (this.startPort == this.endPort) { - rPort = this.startPort; - } else { - rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort, - this.endPort, AvailablePort.SOCKET); - } - return rPort; - } - - public void stop() { - if(!isRunning()){ - throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString()); - } - receiver.stop(); - -// InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) -// .getDistributedSystem(); -// system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this); - - } - - public String getHost() { - return this.host; - } - - public String getBindAddress() { - return this.bindAdd; - } - - public boolean isRunning() { - if (this.receiver != null) { - return this.receiver.isRunning(); - } - return false; - } - - public String toString() { - return new StringBuffer() - .append("Gateway Receiver") - .append("@").append(Integer.toHexString(hashCode())) - .append(" [") - .append("host='").append(getHost()) - .append("'; port=").append(getPort()) - .append("; bindAddress=").append(getBindAddress()) - .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings()) - .append("; socketBufferSize=").append(getSocketBufferSize()) - .append("; isManualStart=").append(isManualStart()) - .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP})) - .append("]") - .toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java deleted file mode 100644 index d2302c4..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ /dev/null @@ -1,802 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.gemstone.gemfire.GemFireIOException; -import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.CancelException; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.client.ServerConnectivityException; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache.client.internal.Connection; -import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.distributed.internal.ServerLocation; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.pdx.PdxRegistryMismatchException; -import com.gemstone.gemfire.security.GemFireSecurityException; -import com.gemstone.gemfire.cache.client.internal.SenderProxy; - -/** - * @since GemFire 7.0 - * - */ -public class GatewaySenderEventRemoteDispatcher implements - GatewaySenderEventDispatcher { - - private static final Logger logger = LogService.getLogger(); - - private final AbstractGatewaySenderEventProcessor processor; - - private volatile Connection connection; - - private final Set<String> notFoundRegions = new HashSet<String>(); - - private final Object notFoundRegionsSync = new Object(); - - private final AbstractGatewaySender sender; - - private AckReaderThread ackReaderThread; - - private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock(); - - /** - * This count is reset to 0 each time a successful connection is made. - */ - private int failedConnectCount = 0; - - public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) { - this.processor = eventProcessor; - this.sender = eventProcessor.getSender(); -// this.ackReaderThread = new AckReaderThread(sender); - try { - initializeConnection(); - } - catch (GatewaySenderException e) { - if (e.getCause() instanceof GemFireSecurityException) { - throw e; - } - } - } - - protected GatewayAck readAcknowledgement() { - SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy()); - GatewayAck ack = null; - Exception ex; - try { - connection = getConnection(false); - if (logger.isDebugEnabled()) { - logger.debug(" Receiving ack on the thread {}", connection); - } - this.connectionLifeCycleLock.readLock().lock(); - try { - if (connection != null) { - ack = (GatewayAck)sp.receiveAckFromReceiver(connection); - } - } finally { - this.connectionLifeCycleLock.readLock().unlock(); - } - - } catch (Exception e) { - Throwable t = e.getCause(); - if (t instanceof BatchException70) { - // A BatchException has occurred. - // Do not process the connection as dead since it is not dead. - ex = (BatchException70)t; - } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection - ex = (Exception) e.getCause(); - }else { - ex = e; - // keep using the connection if we had a batch exception. Else, destroy - // it - destroyConnection(); - } - if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) { - // if our pool is shutdown then just be silent - } else if (ex instanceof IOException - || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException)) - || ex instanceof ConnectionDestroyedException) { - // If the cause is an IOException or a ServerException, sleep and retry. - // Sleep for a bit and recheck. - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } else { - if (!(ex instanceof CancelException)) { - logger.fatal(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), - ex); - } - this.processor.setIsStopped(true); - } - } - return ack; - } - - @Override - public boolean dispatchBatch(List events, boolean isRetry) { - GatewaySenderStats statistics = this.sender.getStatistics(); - boolean success = false; - try { - long start = statistics.startTime(); - success =_dispatchBatch(events, isRetry); - if (success) { - statistics.endBatch(start, events.size()); - } - } catch (GatewaySenderException ge) { - - Throwable t = ge.getCause(); - if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) { - // if our pool is shutdown then just be silent - } else if (t instanceof IOException - || t instanceof ServerConnectivityException - || t instanceof ConnectionDestroyedException - || t instanceof MessageTooLargeException - || t instanceof IllegalStateException) { - this.processor.handleException(); - // If the cause is an IOException or a ServerException, sleep and retry. - // Sleep for a bit and recheck. - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - if (logger.isDebugEnabled()) { - logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId()); - } - } - else { - logger.fatal(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge); - this.processor.setIsStopped(true); - } - } - catch (CancelException e) { - if (logger.isDebugEnabled()) { - logger.debug("Stopping the processor because cancellation occurred while processing a batch"); - } - this.processor.setIsStopped(true); - throw e; - } catch (Exception e) { - this.processor.setIsStopped(true); - logger.fatal(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), - e); - } - return success; - } - - private boolean _dispatchBatch(List events, boolean isRetry) { - Exception ex = null; - int currentBatchId = this.processor.getBatchId(); - connection = getConnection(true); - int batchIdForThisConnection = this.processor.getBatchId(); - GatewaySenderStats statistics = this.sender.getStatistics(); - // This means we are writing to a new connection than the previous batch. - // i.e The connection has been reset. It also resets the batchId. - if (currentBatchId != batchIdForThisConnection - || this.processor.isConnectionReset()) { - return false; - } - try { - if (this.processor.isConnectionReset()) { - isRetry = true; - } - SenderProxy sp = new SenderProxy(this.sender.getProxy()); - this.connectionLifeCycleLock.readLock().lock(); - try { - if (connection != null) { - sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry); - if (logger.isDebugEnabled()) { - logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}", - this.processor.getSender(), currentBatchId, events.size(), this.processor.getQueue().size(), connection); - } - } else { - throw new ConnectionDestroyedException(); - } - } - finally{ - this.connectionLifeCycleLock.readLock().unlock(); - } - return true; - } - catch (ServerOperationException e) { - Throwable t = e.getCause(); - if (t instanceof BatchException70) { - // A BatchException has occurred. - // Do not process the connection as dead since it is not dead. - ex = (BatchException70)t; - } - else { - ex = e; - // keep using the connection if we had a batch exception. Else, destroy it - destroyConnection(); - } - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( - new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); - } - catch (GemFireIOException e) { - Throwable t = e.getCause(); - if (t instanceof MessageTooLargeException) { - // A MessageTooLargeException has occurred. - // Do not process the connection as dead since it is not dead. - ex = (MessageTooLargeException)t; - // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less) - int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2; - logger.warn(LocalizedMessage.create( - LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e); - this.processor.setBatchSize(newBatchSize); - statistics.incBatchesResized(); - } - else { - ex = e; - // keep using the connection if we had a MessageTooLargeException. Else, destroy it - destroyConnection(); - } - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( - new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); - } - catch (IllegalStateException e) { - this.processor.setException(new GatewaySenderException(e)); - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( - new Object[] {this, Integer.valueOf(currentBatchId), connection}), e); - } - catch (Exception e) { - // An Exception has occurred. Get its cause. - Throwable t = e.getCause(); - if (t instanceof IOException) { - // An IOException has occurred. - ex = (IOException)t; - } else { - ex = e; - } - //the cause is not going to be BatchException70. So, destroy the connection - destroyConnection(); - - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( - new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); - } - } - - /** - * Acquires or adds a new <code>Connection</code> to the corresponding - * <code>Gateway</code> - * - * @return the <code>Connection</code> - * - * @throws GatewaySenderException - */ - public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{ - if (this.processor.isStopped()) { - return null; - } - // IF the connection is null - // OR the connection's ServerLocation doesn't match with the one stored in sender - // THEN initialize the connection - if(!this.sender.isParallel()) { - if (this.connection == null || this.connection.isDestroyed() - || !this.connection.getServer().equals(this.sender.getServerLocation())) { - if (logger.isDebugEnabled()) { - logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}", - ((this.connection == null) ? "null" : this.connection.getServer()), - this.sender.getServerLocation()); - } - // Initialize the connection - initializeConnection(); - } - } else { - if (this.connection == null || this.connection.isDestroyed()) { - initializeConnection(); - } - } - - // Here we might wait on a connection to another server if I was secondary - // so don't start waiting until I am primary - Cache cache = this.sender.getCache(); - if (cache != null && !cache.isClosed()) { - if (this.sender.isPrimary() && (this.connection != null)) { - if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) { - this.ackReaderThread = new AckReaderThread(this.sender, this.processor); - this.ackReaderThread.start(); - this.ackReaderThread.waitForRunningAckReaderThreadRunningState(); - } - } - } - return this.connection; - } - - public void destroyConnection() { - this.connectionLifeCycleLock.writeLock().lock(); - try { - Connection con = this.connection; - if (con != null) { - if (!con.isDestroyed()) { - con.destroy(); - this.sender.getProxy().returnConnection(con); - } - - // Reset the connection so the next time through a new one will be - // obtained - this.connection = null; - this.sender.setServerLocation(null); - } - } - finally { - this.connectionLifeCycleLock.writeLock().unlock(); - } - } - - /** - * Initializes the <code>Connection</code>. - * - * @throws GatewaySenderException - */ - private void initializeConnection() throws GatewaySenderException, - GemFireSecurityException { - this.connectionLifeCycleLock.writeLock().lock(); - try { - // Attempt to acquire a connection - if (this.sender.getProxy() == null - || this.sender.getProxy().isDestroyed()) { - this.sender.initProxy(); - } else { - this.processor.resetBatchId(); - } - Connection con; - try { - if (this.sender.isParallel()) { - /* - * TODO - The use of acquireConnection should be removed - * from the gateway code. This method is fine for tests, - * but these connections should really be managed inside - * the pool code. If the gateway needs to persistent connection - * to a single server, which should create have the OpExecutor - * that holds a reference to the connection (similar to the way - * we do with thread local connections). - * Use {@link ExecutablePool#setupServerAffinity(boolean)} for - * gateway code - */ - con = this.sender.getProxy().acquireConnection(); - // For parallel sender, setting server location will not matter. - // everytime it will ask for acquire connection whenever it needs it. I - // am saving this server location for command purpose - sender.setServerLocation(con.getServer()); - } else { - synchronized (this.sender - .getLockForConcurrentDispatcher()) { - ServerLocation server = this.sender.getServerLocation(); - if (server != null) { - if (logger.isDebugEnabled()) { - logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server); - } - con = this.sender.getProxy().acquireConnection(server); - } else { - if (logger.isDebugEnabled()) { - logger.debug("ServerLocation is null. Creating new connection. "); - } - con = this.sender.getProxy().acquireConnection(); - // Acquired connection from pool!! Update the server location - // information in the sender and - // distribute the information to other senders ONLY IF THIS SENDER - // IS - // PRIMARY - if (this.sender.isPrimary()) { - if (sender.getServerLocation() == null) { - sender.setServerLocation(con.getServer()); - } - new UpdateAttributesProcessor(this.sender).distribute(false); - } - } - } - } - } catch (ServerConnectivityException e) { - this.failedConnectCount++; - Throwable ex = null; - - if (e.getCause() instanceof GemFireSecurityException) { - ex = e.getCause(); - if (logConnectionFailure()) { - // only log this message once; another msg is logged once we connect - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, - new Object[] { this.processor.getSender().getId(), ex.getMessage() })); - } - throw new GatewaySenderException(ex); - } - List<ServerLocation> servers = this.sender.getProxy() - .getCurrentServers(); - String ioMsg = null; - if (servers.size() == 0) { - ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS - .toLocalizedString(); - } else { - final StringBuilder buffer = new StringBuilder(); - for (ServerLocation server : servers) { - String endpointName = String.valueOf(server); - if (buffer.length() > 0) { - buffer.append(", "); - } - buffer.append(endpointName); - } - ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0 - .toLocalizedString(buffer.toString()); - } - ex = new IOException(ioMsg); - // Set the serverLocation to null so that a new connection can be - // obtained in next attempt - this.sender.setServerLocation(null); - if (this.failedConnectCount == 1) { - // only log this message once; another msg is logged once we connect - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT, - this.processor.getSender().getId())); - - } - // Wrap the IOException in a GatewayException so it can be processed the - // same as the other exceptions that might occur in sendBatch. - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT - .toLocalizedString(this.processor.getSender().getId()), ex); - } - if (this.failedConnectCount > 0) { - Object[] logArgs = new Object[] { this.processor.getSender().getId(), - con, Integer.valueOf(this.failedConnectCount) }; - logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS, - logArgs)); - this.failedConnectCount = 0; - } else { - Object[] logArgs = new Object[] { this.processor.getSender().getId(), - con }; - logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs)); - } - this.connection = con; - this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize()); - } - catch (ConnectionDestroyedException e) { - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor - .getSender().getId()), e); - } - finally { - this.connectionLifeCycleLock.writeLock().unlock(); - } - } - - protected boolean logConnectionFailure() { - // always log the first failure - if (logger.isDebugEnabled() || this.failedConnectCount == 0) { - return true; - } - else { - // subsequent failures will be logged on 30th, 300th, 3000th try - // each try is at 100millis from higher layer so this accounts for logging - // after 3s, 30s and then every 5mins - if (this.failedConnectCount >= 3000) { - return (this.failedConnectCount % 3000) == 0; - } - else { - return (this.failedConnectCount == 30 || this.failedConnectCount == 300); - } - } - } - - public static class GatewayAck { - private int batchId; - - private int numEvents; - - private BatchException70 be; - - public GatewayAck(BatchException70 be, int bId) { - this.be = be; - this.batchId = bId; - } - - public GatewayAck(int batchId, int numEvents) { - this.batchId = batchId; - this.numEvents = numEvents; - } - - /** - * @return the numEvents - */ - public int getNumEvents() { - return numEvents; - } - - /** - * @return the batchId - */ - public int getBatchId() { - return batchId; - } - - public BatchException70 getBatchException() { - return this.be; - } - } - - class AckReaderThread extends Thread { - - private Object runningStateLock = new Object(); - - /** - * boolean to make a shutdown request - */ - private volatile boolean shutdown = false; - - private final GemFireCacheImpl cache; - - private volatile boolean ackReaderThreadRunning = false; - - public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) { - super("AckReaderThread for : " + processor.getName()); - this.setDaemon(true); - this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache(); - } - - public void waitForRunningAckReaderThreadRunningState() { - synchronized (runningStateLock) { - while (!this.ackReaderThreadRunning) { - try { - this.runningStateLock.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - - private boolean checkCancelled() { - if (shutdown) { - return true; - } - - if (cache.getCancelCriterion().isCancelInProgress()) { - return true; - } - return false; - } - - @Override - public void run() { - if (logger.isDebugEnabled()) { - logger.debug("AckReaderThread started.. "); - } - - synchronized (runningStateLock) { - ackReaderThreadRunning = true; - this.runningStateLock.notifyAll(); - } - - try { - for (;;) { - if (checkCancelled()) { - break; - } - GatewayAck ack = readAcknowledgement(); - if (ack != null) { - boolean gotBatchException = ack.getBatchException() != null; - int batchId = ack.getBatchId(); - int numEvents = ack.getNumEvents(); - - // If the batch is successfully processed, remove it from the - // queue. - if (gotBatchException) { - logger.info(LocalizedMessage.create( - LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION, - new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException())); - // If we get PDX related exception in the batch exception then try - // to resend all the pdx events as well in the next batch. - final GatewaySenderStats statistics = sender.getStatistics(); - statistics.incBatchesRedistributed(); - // log batch exceptions and remove all the events if remove from - // exception is true - // do not remove if it is false - logBatchExceptions(ack.getBatchException()); - processor.handleSuccessBatchAck(batchId); - - } // unsuccessful batch - else { // The batch was successful. - if (logger.isDebugEnabled()) { - logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events", - processor.getSender(), ack.getBatchId(), ack.getNumEvents()); - } - processor.handleSuccessBatchAck(batchId); - } - } else { - // If we have received IOException. - if (logger.isDebugEnabled()) { - logger.debug("{}: Received null ack from remote site.", processor.getSender()); - } - processor.handleException(); - try { // This wait is before trying to getting new connection to - // receive ack. Without this there will be continuous call to - // getConnection - Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } catch (Exception e) { - if (!checkCancelled()) { - logger.fatal(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e); - } - sender.getLifeCycleLock().writeLock().lock(); - try { - processor.stopProcessing(); - sender.clearTempEventsAfterSenderStopped(); - } finally { - sender.getLifeCycleLock().writeLock().unlock(); - } - // destroyConnection(); - } finally { - if (logger.isDebugEnabled()) { - logger.debug("AckReaderThread exiting. "); - } - ackReaderThreadRunning = false; - } - - } - - /** - * @param exception - * - */ - private void logBatchExceptions(BatchException70 exception) { - for (BatchException70 be : exception.getExceptions()) { - boolean logWarning = true; - if (be.getCause() instanceof RegionDestroyedException) { - RegionDestroyedException rde = (RegionDestroyedException)be - .getCause(); - synchronized (notFoundRegionsSync) { - if (notFoundRegions.contains(rde.getRegionFullPath())) { - logWarning = false; - } else { - notFoundRegions.add(rde.getRegionFullPath()); - } - } - } else if (be.getCause() instanceof IllegalStateException - && be.getCause().getMessage().contains("Unknown pdx type")) { - List<GatewaySenderEventImpl> pdxEvents = processor - .getBatchIdToPDXEventsMap().get(be.getBatchId()); - if (logWarning) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0, - be.getIndex()), be); - } - if (pdxEvents != null) { - for (GatewaySenderEventImpl senderEvent : pdxEvents) { - senderEvent.isAcked = false; - } - GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex()); - if (logWarning) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent)); - } - } - continue; - } - if (logWarning) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0, - be.getIndex()), be); - } - List<GatewaySenderEventImpl>[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId()); - if (eventsArr != null) { - List<GatewaySenderEventImpl> filteredEvents = eventsArr[1]; - GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents - .get(be.getIndex()); - if (logWarning) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent)); - } - } - } - } - - boolean isRunning() { - return this.ackReaderThreadRunning; - } - - public void shutdown() { - // we need to destroy connection irrespective of we are listening on it or - // not. No need to take lock as the reader thread may be blocked and we might not - // get chance to destroy unless that returns. - if (connection != null) { - Connection conn = connection; - shutDownAckReaderConnection(); - if (!conn.isDestroyed()) { - conn.destroy(); - sender.getProxy().returnConnection(conn); - } - } - this.shutdown = true; - boolean interrupted = Thread.interrupted(); - try { - this.join(15 * 1000); - } catch (InterruptedException e) { - interrupted = true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - if (this.isAlive()) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION)); - } - } - - private void shutDownAckReaderConnection() { - Connection conn = connection; - //attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck on a read - try { - if (conn != null && conn.getInputStream() != null) { - conn.getInputStream().close(); - } - } catch (IOException e) { - logger.warn("Unable to shutdown AckReaderThread Connection"); - } catch (ConnectionDestroyedException e) { - logger.info("AckReader shutting down and connection already destroyed"); - } - - } - } - - public void stopAckReaderThread() { - if (this.ackReaderThread != null) { - this.ackReaderThread.shutdown(); - } - } - - @Override - public boolean isRemoteDispatcher() { - return true; - } - - @Override - public boolean isConnectedToRemote() { - return connection != null; - } - - public void stop() { - stopAckReaderThread(); - if(this.processor.isStopped()) { - destroyConnection(); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java deleted file mode 100644 index 4974c6f..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderFactoryImpl.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl; -import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * - * @since GemFire 7.0 - * - */ -public class GatewaySenderFactoryImpl implements - InternalGatewaySenderFactory { - - private static final Logger logger = LogService.getLogger(); - - /** - * Used internally to pass the attributes from this factory to the real - * GatewaySender it is creating. - */ - private GatewaySenderAttributes attrs = new GatewaySenderAttributes(); - - private Cache cache; - - private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false); - - public GatewaySenderFactoryImpl(Cache cache) { - this.cache = cache; - } - - public GatewaySenderFactory setParallel(boolean isParallel){ - this.attrs.isParallel = isParallel; - return this; - } - - public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) { - this.attrs.isForInternalUse = isForInternalUse; - return this; - } - - public GatewaySenderFactory addGatewayEventFilter( - GatewayEventFilter filter) { - this.attrs.addGatewayEventFilter(filter); - return this; - } - - public GatewaySenderFactory addGatewayTransportFilter( - GatewayTransportFilter filter) { - this.attrs.addGatewayTransportFilter(filter); - return this; - } - - public GatewaySenderFactory addAsyncEventListener( - AsyncEventListener listener) { - this.attrs.addAsyncEventListener(listener); - return this; - } - - public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) { - this.attrs.socketBufferSize = socketBufferSize; - return this; - } - - public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) { - this.attrs.socketReadTimeout = socketReadTimeout; - return this; - } - - public GatewaySenderFactory setDiskStoreName(String diskStoreName) { - this.attrs.diskStoreName = diskStoreName; - return this; - } - - public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) { - this.attrs.maximumQueueMemory = maximumQueueMemory; - return this; - } - - public GatewaySenderFactory setBatchSize(int batchSize) { - this.attrs.batchSize = batchSize; - return this; - } - - public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) { - this.attrs.batchTimeInterval = batchTimeInterval; - return this; - } - - public GatewaySenderFactory setBatchConflationEnabled( - boolean enableBatchConflation) { - this.attrs.isBatchConflationEnabled = enableBatchConflation; - return this; - } - - public GatewaySenderFactory setPersistenceEnabled( - boolean enablePersistence) { - this.attrs.isPersistenceEnabled = enablePersistence; - return this; - } - - public GatewaySenderFactory setAlertThreshold(int threshold) { - this.attrs.alertThreshold = threshold; - return this; - } - - public GatewaySenderFactory setManualStart(boolean start) { - this.attrs.manualStart = start; - return this; - } - - public GatewaySenderFactory setLocatorDiscoveryCallback( - LocatorDiscoveryCallback locCallback) { - this.attrs.locatorDiscoveryCallback = locCallback; - return this; - } - - @Override - public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) { - this.attrs.isDiskSynchronous = isSynchronous; - return this; - } - - @Override - public GatewaySenderFactory setDispatcherThreads(int numThreads) { - if ((numThreads > 1) && this.attrs.policy == null) { - this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; - } - this.attrs.dispatcherThreads = numThreads; - return this; - } - - public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) { - this.attrs.parallelism = parallel; - this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; - return this; - } - - @Override - public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) { - this.attrs.policy = policy; - return this; - } - - public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){ - this.attrs.isBucketSorted = isBucketSorted; - return this; - } - public GatewaySender create(String id, int remoteDSId) { - int myDSId = InternalDistributedSystem.getAnyInstance() - .getDistributionManager().getDistributedSystemId(); - if (remoteDSId == myDSId) { - throw new GatewaySenderException( - LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID - .toLocalizedString(id)); - } - if (remoteDSId < 0) { - throw new GatewaySenderException( - LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO - .toLocalizedString(id)); - } - this.attrs.id = id; - this.attrs.remoteDs = remoteDSId; - GatewaySender sender = null; - - if(this.attrs.getDispatcherThreads() <= 0){ - throw new GatewaySenderException( - LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1 - .toLocalizedString(id)); - } - - // Verify socket read timeout if a proper logger is available - if (this.cache instanceof GemFireCacheImpl) { - // If socket read timeout is less than the minimum, log a warning. - // Ideally, this should throw a GatewaySenderException, but wan dunit tests - // were failing, and we were running out of time to change them. - if (this.attrs.getSocketReadTimeout() != 0 - && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) { - logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW, - new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT })); - this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT; - } - - // Log a warning if the old system property is set. - if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) { - if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) { - logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY, - new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" })); - } - } - } - - if (this.attrs.isParallel()) { -// if(this.attrs.getDispatcherThreads() != 1){ -// throw new GatewaySenderException( -// LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1 -// .toLocalizedString(id)); -// } - if ((this.attrs.getOrderPolicy() != null) - && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) { - throw new GatewaySenderException( - LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1 - .toLocalizedString(id, this.attrs.getOrderPolicy())); - } - if (this.cache instanceof GemFireCacheImpl) { - sender = new ParallelGatewaySenderImpl(this.cache, this.attrs); - ((GemFireCacheImpl)this.cache).addGatewaySender(sender); - - if (!this.attrs.isManualStart()) { - sender.start(); - } - } - else if (this.cache instanceof CacheCreation) { - sender = new ParallelGatewaySenderCreation(this.cache, this.attrs); - ((CacheCreation)this.cache).addGatewaySender(sender); - } - } - else { - if (this.attrs.getAsyncEventListeners().size() > 0) { - throw new GatewaySenderException( - LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED - .toLocalizedString(id)); - } -// if (this.attrs.getOrderPolicy() != null) { -// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) { -// throw new GatewaySenderException( -// LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0 -// .toLocalizedString(id)); -// } -// } - if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) { - this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; - } - if (this.cache instanceof GemFireCacheImpl) { - sender = new SerialGatewaySenderImpl(this.cache, this.attrs); - ((GemFireCacheImpl)this.cache).addGatewaySender(sender); - - if (!this.attrs.isManualStart()) { - sender.start(); - } - } - else if (this.cache instanceof CacheCreation) { - sender = new SerialGatewaySenderCreation(this.cache, this.attrs); - ((CacheCreation)this.cache).addGatewaySender(sender); - } - } - return sender; - } - - public GatewaySender create(String id) { - this.attrs.id = id; - GatewaySender sender = null; - - if(this.attrs.getDispatcherThreads() <= 0) { - throw new AsyncEventQueueConfigurationException( - LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1 - .toLocalizedString(id)); - } - - if (this.attrs.isParallel()) { - if ((this.attrs.getOrderPolicy() != null) - && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) { - throw new AsyncEventQueueConfigurationException( - LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1 - .toLocalizedString(id, this.attrs.getOrderPolicy())); - } - - if (this.cache instanceof GemFireCacheImpl) { - sender = new ParallelGatewaySenderImpl(this.cache, this.attrs); - ((GemFireCacheImpl)this.cache).addGatewaySender(sender); - if (!this.attrs.isManualStart()) { - sender.start(); - } - } - else if (this.cache instanceof CacheCreation) { - sender = new ParallelGatewaySenderCreation(this.cache, this.attrs); - ((CacheCreation)this.cache).addGatewaySender(sender); - } - } - else { -// if (this.attrs.getOrderPolicy() != null) { -// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) { -// throw new AsyncEventQueueConfigurationException( -// LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0 -// .toLocalizedString(id)); -// } -// } - if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) { - this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; - } - if (this.cache instanceof GemFireCacheImpl) { - sender = new SerialGatewaySenderImpl(this.cache, this.attrs); - ((GemFireCacheImpl)this.cache).addGatewaySender(sender); - if (!this.attrs.isManualStart()) { - sender.start(); - } - } - else if (this.cache instanceof CacheCreation) { - sender = new SerialGatewaySenderCreation(this.cache, this.attrs); - ((CacheCreation)this.cache).addGatewaySender(sender); - } - } - return sender; - } - - public GatewaySenderFactory removeGatewayEventFilter( - GatewayEventFilter filter) { - this.attrs.eventFilters.remove(filter); - return this; - } - - public GatewaySenderFactory removeGatewayTransportFilter( - GatewayTransportFilter filter) { - this.attrs.transFilters.remove(filter); - return this; - } - - public GatewaySenderFactory setGatewayEventSubstitutionFilter( - GatewayEventSubstitutionFilter filter) { - this.attrs.eventSubstitutionFilter = filter; - return this; - } - - public void configureGatewaySender(GatewaySender senderCreation) { - this.attrs.isParallel = senderCreation.isParallel(); - this.attrs.manualStart = senderCreation.isManualStart(); - this.attrs.socketBufferSize = senderCreation.getSocketBufferSize(); - this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout(); - this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled(); - this.attrs.batchSize = senderCreation.getBatchSize(); - this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval(); - this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled(); - this.attrs.diskStoreName = senderCreation.getDiskStoreName(); - this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous(); - this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory(); - this.attrs.alertThreshold = senderCreation.getAlertThreshold(); - this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads(); - this.attrs.policy = senderCreation.getOrderPolicy(); - for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){ - this.attrs.eventFilters.add(filter); - } - for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){ - this.attrs.transFilters.add(filter); - } - this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java deleted file mode 100644 index 322b1ba..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.EntryOperation; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ResourceEvent; -import com.gemstone.gemfire.internal.cache.DistributedRegion; -import com.gemstone.gemfire.internal.cache.EntryEventImpl; -import com.gemstone.gemfire.internal.cache.EventID; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; -import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; -import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; -import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * @since GemFire 7.0 - * - */ -public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { - - private static final Logger logger = LogService.getLogger(); - - final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup( - "Remote Site Discovery Logger Group", logger); - - public ParallelGatewaySenderImpl(){ - super(); - this.isParallel = true; - } - - public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) { - super(cache, attrs); - } - - @Override - public void start() { - this.getLifeCycleLock().writeLock().lock(); - try { - if (isRunning()) { - logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId())); - return; - } - - if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) { - String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem() - .getConfig().getLocators(); - if (locators.length() == 0) { - throw new IllegalStateException( - LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER - .toLocalizedString()); - } - } - /* - * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor" - * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a - * utility classes of Concurrent version of processor and queue. - */ - eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this); - /*if (getDispatcherThreads() > 1) { - eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this); - } else { - eventProcessor = new ParallelGatewaySenderEventProcessor(this); - }*/ - - eventProcessor.start(); - waitForRunningStatus(); - //Only notify the type registry if this is a WAN gateway queue - if(!isAsyncEventQueue()) { - ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this); - } - new UpdateAttributesProcessor(this).distribute(false); - - InternalDistributedSystem system = (InternalDistributedSystem) this.cache - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this); - - logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this)); - - enqueueTempEvents(); - } - finally { - this.getLifeCycleLock().writeLock().unlock(); - } - } - -// /** -// * The sender is not started but only the message queue i.e. shadowPR is created on the node. -// * @param targetPr -// */ -// private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) { -// eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr); -// } - - - @Override - public void stop() { - this.getLifeCycleLock().writeLock().lock(); - try { - if (!this.isRunning()) { - return; - } - // Stop the dispatcher - AbstractGatewaySenderEventProcessor ev = this.eventProcessor; - //try { - if (ev != null && !ev.isStopped()) { - ev.stopProcessing(); - } - - // Stop the proxy (after the dispatcher, so the socket is still - // alive until after the dispatcher has stopped) - stompProxyDead(); - - // Close the listeners - for (AsyncEventListener listener : this.listeners) { - listener.close(); - } - //stop the running threads, open sockets if any - ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp(); - - logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this)); - - InternalDistributedSystem system = (InternalDistributedSystem) this.cache - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this); - - clearTempEventsAfterSenderStopped(); - // Keep the eventProcessor around so we can ask it for the regionQueues later. - // Tests expect to be able to do this. -// } finally { -// this.eventProcessor = null; -// } - } - finally { - this.getLifeCycleLock().writeLock().unlock(); - } - } - - @Override - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("ParallelGatewaySender{"); - sb.append("id=" + getId()); - sb.append(",remoteDsId="+ getRemoteDSId()); - sb.append(",isRunning ="+ isRunning()); - sb.append("}"); - return sb.toString(); - } - - public void fillInProfile(Profile profile) { - assert profile instanceof GatewaySenderProfile; - GatewaySenderProfile pf = (GatewaySenderProfile)profile; - pf.Id = getId(); - pf.remoteDSId = getRemoteDSId(); - pf.isRunning = isRunning(); - pf.isPrimary = isPrimary(); - pf.isParallel = true; - pf.isBatchConflationEnabled = isBatchConflationEnabled(); - pf.isPersistenceEnabled = isPersistenceEnabled(); - pf.alertThreshold = getAlertThreshold(); - pf.manualStart = isManualStart(); - pf.dispatcherThreads = getDispatcherThreads(); - pf.orderPolicy = getOrderPolicy(); - for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) { - pf.eventFiltersClassNames.add(filter.getClass().getName()); - } - for (GatewayTransportFilter filter : getGatewayTransportFilters()) { - pf.transFiltersClassNames.add(filter.getClass().getName()); - } - for (AsyncEventListener listener : getAsyncEventListeners()) { - pf.senderEventListenerClassNames.add(listener.getClass().getName()); - } - pf.isDiskSynchronous = isDiskSynchronous(); - } - - /* (non-Javadoc) - * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl) - */ - @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { - int bucketId = -1; - //merged from 42004 - if (clonedEvent.getRegion() instanceof DistributedRegion) { -// if (getOrderPolicy() == OrderPolicy.THREAD) { -// bucketId = PartitionedRegionHelper.getHashKey( -// ((EntryEventImpl)clonedEvent).getEventId().getThreadID(), -// getMaxParallelismForReplicatedRegion()); -// } -// else - bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(), - getMaxParallelismForReplicatedRegion()); - } - else { - bucketId = PartitionedRegionHelper - .getHashKey((EntryOperation)clonedEvent); - } - EventID originalEventId = clonedEvent.getEventId(); - long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID()); - - long newThreadId = ThreadIdentifier - .createFakeThreadIDForParallelGSPrimaryBucket(bucketId, - originatingThreadId, getEventIdIndex()); - - // In case of parallel as all events go through primary buckets - // we don't need to generate different threadId for secondary buckets - // as they will be rejected if seen at PR level itself - -// boolean isPrimary = ((PartitionedRegion)getQueue().getRegion()) -// .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); -// if (isPrimary) { -// newThreadId = ThreadIdentifier -// .createFakeThreadIDForParallelGSPrimaryBucket(bucketId, -// originatingThreadId); -// } else { -// newThreadId = ThreadIdentifier -// .createFakeThreadIDForParallelGSSecondaryBucket(bucketId, -// originatingThreadId); -// } - - EventID newEventId = new EventID(originalEventId.getMembershipID(), - newThreadId, originalEventId.getSequenceID(), bucketId); - if (logger.isDebugEnabled()) { - logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}", - this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId); - } - clonedEvent.setEventId(newEventId); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java deleted file mode 100644 index 35cdece..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - - -import java.util.Set; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; -import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; -/** - * Remote version of GatewaySenderEvent Processor - * - */ -public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{ - - public RemoteConcurrentParallelGatewaySenderEventProcessor( - AbstractGatewaySender sender) { - super(sender); - } - - @Override - protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) { - processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()]; - if (logger.isDebugEnabled()) { - logger.debug("Creating GatewaySenderEventProcessor"); - } - for (int i = 0; i < sender.getDispatcherThreads(); i++) { - processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender, - targetRs, i, sender.getDispatcherThreads()); - } - } - - @Override - protected void rebalance() { - GatewaySenderStats statistics = this.sender.getStatistics(); - long startTime = statistics.startLoadBalance(); - try { - for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { - GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher(); - if (remoteDispatcher.isConnectedToRemote()) { - remoteDispatcher.stopAckReaderThread(); - remoteDispatcher.destroyConnection(); - } - } - } finally { - statistics.endLoadBalance(startTime); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java deleted file mode 100644 index 815932e..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import java.io.IOException; -import java.util.Set; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.client.internal.Connection; -import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; -import com.gemstone.gemfire.internal.logging.LogService; - -public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor { - private static final Logger logger = LogService.getLogger(); - - protected RemoteParallelGatewaySenderEventProcessor( - AbstractGatewaySender sender) { - super(sender); - } - - /** - * use in concurrent scenario where queue is to be shared among all the processors. - */ - protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, Set<Region> userRegions, int id, int nDispatcher) { - super(sender, userRegions, id, nDispatcher); - } - - @Override - protected void rebalance() { - GatewaySenderStats statistics = this.sender.getStatistics(); - long startTime = statistics.startLoadBalance(); - try { - if (this.dispatcher.isRemoteDispatcher()) { - GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) this.dispatcher; - if (remoteDispatcher.isConnectedToRemote()) { - remoteDispatcher.stopAckReaderThread(); - remoteDispatcher.destroyConnection(); - } - } - } finally { - statistics.endLoadBalance(startTime); - } - } - - public void initializeEventDispatcher() { - if (logger.isDebugEnabled()) { - logger.debug(" Creating the GatewayEventRemoteDispatcher"); - } - if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) { - this.dispatcher = new GatewaySenderEventRemoteDispatcher(this); - } - } - - /** - * Returns if corresponding receiver WAN site of this GatewaySender has - * GemfireVersion > 7.0.1 - * - * @param disp - * @return true if remote site Gemfire Version is >= 7.0.1 - */ - private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) - throws GatewaySenderException { - try { - GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) disp; - // This will create a new connection if no batch has been sent till - // now. - Connection conn = remoteDispatcher.getConnection(false); - if (conn != null) { - short remoteSiteVersion = conn.getWanSiteVersion(); - if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) { - return true; - } - } - } catch (GatewaySenderException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException - || e instanceof GatewaySenderConfigurationException - || cause instanceof ConnectionDestroyedException) { - try { - int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL; - if (logger.isDebugEnabled()) { - logger.debug("Sleeping for {} milliseconds", sleepInterval); - } - Thread.sleep(sleepInterval); - } catch (InterruptedException ie) { - // log the exception - if (logger.isDebugEnabled()){ - logger.debug(ie.getMessage(), ie); - } - } - } - throw e; - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java deleted file mode 100644 index 8a25ab6..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.logging.LogService; - -public class RemoteConcurrentSerialGatewaySenderEventProcessor extends - ConcurrentSerialGatewaySenderEventProcessor { - - private static final Logger logger = LogService.getLogger(); - - public RemoteConcurrentSerialGatewaySenderEventProcessor( - AbstractGatewaySender sender) { - super(sender); - } - - @Override - protected void initializeMessageQueue(String id) { - for (int i = 0; i < sender.getDispatcherThreads(); i++) { - processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id - + "." + i)); - if (logger.isDebugEnabled()) { - logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java deleted file mode 100644 index 82fa585..0000000 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.logging.LogService; - -public class RemoteSerialGatewaySenderEventProcessor extends - SerialGatewaySenderEventProcessor { - - private static final Logger logger = LogService.getLogger(); - public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, - String id) { - super(sender, id); - } - - public void initializeEventDispatcher() { - if (logger.isDebugEnabled()) { - logger.debug(" Creating the GatewayEventRemoteDispatcher"); - } - // In case of serial there is a way to create gatewaySender and attach - // asyncEventListener. Not sure of the use-case but there are dunit tests - // To make them pass uncommenting the below condition - if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) { - this.dispatcher = new GatewaySenderEventRemoteDispatcher(this); - }else{ - this.dispatcher = new GatewaySenderEventCallbackDispatcher(this); - } - } - -}
