http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java new file mode 100644 index 0000000..815932e --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteParallelGatewaySenderEventProcessor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import java.io.IOException; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.client.internal.Connection; +import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; +import com.gemstone.gemfire.internal.logging.LogService; + +public class RemoteParallelGatewaySenderEventProcessor extends ParallelGatewaySenderEventProcessor { + private static final Logger logger = LogService.getLogger(); + + protected RemoteParallelGatewaySenderEventProcessor( + AbstractGatewaySender sender) { + super(sender); + } + + /** + * use in concurrent scenario where queue is to be shared among all the processors. + */ + protected RemoteParallelGatewaySenderEventProcessor(AbstractGatewaySender sender, Set<Region> userRegions, int id, int nDispatcher) { + super(sender, userRegions, id, nDispatcher); + } + + @Override + protected void rebalance() { + GatewaySenderStats statistics = this.sender.getStatistics(); + long startTime = statistics.startLoadBalance(); + try { + if (this.dispatcher.isRemoteDispatcher()) { + GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) this.dispatcher; + if (remoteDispatcher.isConnectedToRemote()) { + remoteDispatcher.stopAckReaderThread(); + remoteDispatcher.destroyConnection(); + } + } + } finally { + statistics.endLoadBalance(startTime); + } + } + + public void initializeEventDispatcher() { + if (logger.isDebugEnabled()) { + logger.debug(" Creating the GatewayEventRemoteDispatcher"); + } + if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) { + this.dispatcher = new GatewaySenderEventRemoteDispatcher(this); + } + } + + /** + * Returns if corresponding receiver WAN site of this GatewaySender has + * GemfireVersion > 7.0.1 + * + * @param disp + * @return true if remote site Gemfire Version is >= 7.0.1 + */ + private boolean shouldSendVersionEvents(GatewaySenderEventDispatcher disp) + throws GatewaySenderException { + try { + GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher) disp; + // This will create a new connection if no batch has been sent till + // now. + Connection conn = remoteDispatcher.getConnection(false); + if (conn != null) { + short remoteSiteVersion = conn.getWanSiteVersion(); + if (Version.GFE_701.compareTo(remoteSiteVersion) <= 0) { + return true; + } + } + } catch (GatewaySenderException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException + || e instanceof GatewaySenderConfigurationException + || cause instanceof ConnectionDestroyedException) { + try { + int sleepInterval = GatewaySender.CONNECTION_RETRY_INTERVAL; + if (logger.isDebugEnabled()) { + logger.debug("Sleeping for {} milliseconds", sleepInterval); + } + Thread.sleep(sleepInterval); + } catch (InterruptedException ie) { + // log the exception + if (logger.isDebugEnabled()){ + logger.debug(ie.getMessage(), ie); + } + } + } + throw e; + } + return false; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java new file mode 100644 index 0000000..8a25ab6 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.logging.LogService; + +public class RemoteConcurrentSerialGatewaySenderEventProcessor extends + ConcurrentSerialGatewaySenderEventProcessor { + + private static final Logger logger = LogService.getLogger(); + + public RemoteConcurrentSerialGatewaySenderEventProcessor( + AbstractGatewaySender sender) { + super(sender); + } + + @Override + protected void initializeMessageQueue(String id) { + for (int i = 0; i < sender.getDispatcherThreads(); i++) { + processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id + + "." + i)); + if (logger.isDebugEnabled()) { + logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java new file mode 100644 index 0000000..82fa585 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackDispatcher; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.logging.LogService; + +public class RemoteSerialGatewaySenderEventProcessor extends + SerialGatewaySenderEventProcessor { + + private static final Logger logger = LogService.getLogger(); + public RemoteSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, + String id) { + super(sender, id); + } + + public void initializeEventDispatcher() { + if (logger.isDebugEnabled()) { + logger.debug(" Creating the GatewayEventRemoteDispatcher"); + } + // In case of serial there is a way to create gatewaySender and attach + // asyncEventListener. Not sure of the use-case but there are dunit tests + // To make them pass uncommenting the below condition + if (this.sender.getRemoteDSId() != GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID) { + this.dispatcher = new GatewaySenderEventRemoteDispatcher(this); + }else{ + this.dispatcher = new GatewaySenderEventCallbackDispatcher(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java new file mode 100644 index 0000000..85e1bc0 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.DistributedLockService; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.cache.EntryEventImpl; +import com.gemstone.gemfire.internal.cache.EventID; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; +import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * @since GemFire 7.0 + * + */ +public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { + + private static final Logger logger = LogService.getLogger(); + + final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup( + "Remote Site Discovery Logger Group", logger); + + public SerialGatewaySenderImpl(){ + super(); + this.isParallel = false; + } + public SerialGatewaySenderImpl(Cache cache, + GatewaySenderAttributes attrs) { + super(cache, attrs); + } + + @Override + public void start() { + if (logger.isDebugEnabled()) { + logger.debug("Starting gatewaySender : {}", this); + } + + this.getLifeCycleLock().writeLock().lock(); + try { + if (isRunning()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId())); + return; + } + if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) { + String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem() + .getConfig().getLocators(); + if (locators.length() == 0) { + throw new GatewaySenderConfigurationException( + LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER + .toLocalizedString()); + } + } + getSenderAdvisor().initDLockService(); + if (!isPrimary()) { + if (getSenderAdvisor().volunteerForPrimary()) { + getSenderAdvisor().makePrimary(); + } else { + getSenderAdvisor().makeSecondary(); + } + } + if (getDispatcherThreads() > 1) { + eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor( + SerialGatewaySenderImpl.this); + } else { + eventProcessor = new RemoteSerialGatewaySenderEventProcessor( + SerialGatewaySenderImpl.this, getId()); + } + eventProcessor.start(); + waitForRunningStatus(); + this.startTime = System.currentTimeMillis(); + + //Only notify the type registry if this is a WAN gateway queue + if(!isAsyncEventQueue()) { + ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this); + } + new UpdateAttributesProcessor(this).distribute(false); + + + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this); + + logger.info(LocalizedMessage.create(LocalizedStrings.SerialGatewaySenderImpl_STARTED__0, this)); + + enqueueTempEvents(); + } finally { + this.getLifeCycleLock().writeLock().unlock(); + } + } + + @Override + public void stop() { + if (logger.isDebugEnabled()) { + logger.debug("Stopping Gateway Sender : {}", this); + } + this.getLifeCycleLock().writeLock().lock(); + try { + // Stop the dispatcher + AbstractGatewaySenderEventProcessor ev = this.eventProcessor; + if (ev != null && !ev.isStopped()) { + ev.stopProcessing(); + } + + // Stop the proxy (after the dispatcher, so the socket is still + // alive until after the dispatcher has stopped) + stompProxyDead(); + + // Close the listeners + for (AsyncEventListener listener : this.listeners) { + listener.close(); + } + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this)); + + clearTempEventsAfterSenderStopped(); + } finally { + this.getLifeCycleLock().writeLock().unlock(); + } + + if (this.isPrimary()) { + try { + DistributedLockService + .destroy(getSenderAdvisor().getDLockServiceName()); + } catch (IllegalArgumentException e) { + // service not found... ignore + } + } + Set<RegionQueue> queues = getQueues(); + if (queues != null && !queues.isEmpty()) { + for (RegionQueue q : queues) { + ((SerialGatewaySenderQueue)q).cleanUp(); + } + } + + this.setIsPrimary(false); + new UpdateAttributesProcessor(this).distribute(false); + Thread lockObtainingThread = getSenderAdvisor().getLockObtainingThread(); + if (lockObtainingThread != null && lockObtainingThread.isAlive()) { + // wait a while for thread to terminate + try { + lockObtainingThread.join(3000); + } catch (InterruptedException ex) { + // we allowed our join to be canceled + // reset interrupt bit so this thread knows it has been interrupted + Thread.currentThread().interrupt(); + } + if (lockObtainingThread.isAlive()) { + logger.info(LocalizedMessage.create(LocalizedStrings.GatewaySender_COULD_NOT_STOP_LOCK_OBTAINING_THREAD_DURING_GATEWAY_SENDER_STOP)); + } + } + + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this); + + this.eventProcessor = null; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("SerialGatewaySender{"); + sb.append("id=" + getId()); + sb.append(",remoteDsId="+ getRemoteDSId()); + sb.append(",isRunning ="+ isRunning()); + sb.append(",isPrimary ="+ isPrimary()); + sb.append("}"); + return sb.toString(); + } + + @Override + public void fillInProfile(Profile profile) { + assert profile instanceof GatewaySenderProfile; + GatewaySenderProfile pf = (GatewaySenderProfile)profile; + pf.Id = getId(); + pf.startTime = getStartTime(); + pf.remoteDSId = getRemoteDSId(); + pf.isRunning = isRunning(); + pf.isPrimary = isPrimary(); + pf.isParallel = false; + pf.isBatchConflationEnabled = isBatchConflationEnabled(); + pf.isPersistenceEnabled = isPersistenceEnabled(); + pf.alertThreshold = getAlertThreshold(); + pf.manualStart = isManualStart(); + for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) { + pf.eventFiltersClassNames.add(filter.getClass().getName()); + } + for (GatewayTransportFilter filter : getGatewayTransportFilters()) { + pf.transFiltersClassNames.add(filter.getClass().getName()); + } + for (AsyncEventListener listener : getAsyncEventListeners()) { + pf.senderEventListenerClassNames.add(listener.getClass().getName()); + } + pf.isDiskSynchronous = isDiskSynchronous(); + pf.dispatcherThreads = getDispatcherThreads(); + pf.orderPolicy = getOrderPolicy(); + pf.serverLocation = this.getServerLocation(); + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl) + */ + @Override + protected void setModifiedEventId(EntryEventImpl clonedEvent) { + EventID originalEventId = clonedEvent.getEventId(); + long originalThreadId = originalEventId.getThreadID(); + long newThreadId = originalThreadId; + if (ThreadIdentifier.isWanTypeThreadID(newThreadId)) { + // This thread id has already been converted. Do nothing. + } else { + newThreadId = ThreadIdentifier + .createFakeThreadIDForParallelGSPrimaryBucket(0, originalThreadId, + getEventIdIndex()); + } + EventID newEventId = new EventID(originalEventId.getMembershipID(), + newThreadId, originalEventId.getSequenceID()); + if (logger.isDebugEnabled()) { + logger.debug("{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}", + this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId); + } + clonedEvent.setEventId(newEventId); + } + +}
