http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index e21a834..e0b5ab8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -12,35 +12,44 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.Cache; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.CacheClientStatus; +import org.apache.geode.internal.cache.IncomingGatewayStatus; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.atomic.AtomicIntegerArray; /** * Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of * clients by looking at their heartbeats. If too much time elapses between heartbeats, the monitor * determines that the client is dead and interrupts its threads. - * - * + * * @since GemFire 4.2.3 */ public class ClientHealthMonitor { @@ -69,7 +78,7 @@ public class ClientHealthMonitor { /** * THe GemFire <code>Cache</code> */ - final protected Cache _cache; + private final InternalCache _cache; /** * A thread that validates client connections @@ -123,7 +132,7 @@ public class ClientHealthMonitor { * client has died and interrupting its sockets. * @return The singleton <code>ClientHealthMonitor</code> instance */ - public static ClientHealthMonitor getInstance(Cache cache, int maximumTimeBetweenPings, + public static ClientHealthMonitor getInstance(InternalCache cache, int maximumTimeBetweenPings, CacheClientNotifierStats stats) { createInstance(cache, maximumTimeBetweenPings, stats); return _instance; @@ -305,7 +314,7 @@ public class ClientHealthMonitor { scheduledToBeRemovedTx.removeAll(txids); } }; - ((GemFireCacheImpl) this._cache).getCCPTimer().schedule(task, timeout); + this._cache.getCCPTimer().schedule(task, timeout); } } } @@ -384,55 +393,6 @@ public class ClientHealthMonitor { } } - // /** - // * Returns modifiable map (changes do not effect this class) of memberId - // * to connection count. - // */ - // public Map getConnectedClients() { - // Map map = new HashMap(); // KEY=memberId, VALUE=connectionCount (Integer) - // synchronized (_clientThreadsLock) { - // Iterator connectedClients = this._clientThreads.entrySet().iterator(); - // while (connectedClients.hasNext()) { - // Map.Entry entry = (Map.Entry) connectedClients.next(); - // String memberId = (String) entry.getKey();// memberId includes FQDN - // Set connections = (Set) entry.getValue(); - // int socketPort = 0; - // InetAddress socketAddress = null; - // ///* - // Iterator serverConnections = connections.iterator(); - // // Get data from one. - // while (serverConnections.hasNext()) { - // ServerConnection sc = (ServerConnection) serverConnections.next(); - // socketPort = sc.getSocketPort(); - // socketAddress = sc.getSocketAddress(); - // break; - // } - // //*/ - // int connectionCount = connections.size(); - // String clientString = null; - // if (socketAddress == null) { - // clientString = "client member id=" + memberId; - // } else { - // clientString = "host name=" + socketAddress.toString() + " host ip=" + - // socketAddress.getHostAddress() + " client port=" + socketPort + " client - // member id=" + memberId; - // } - // map.put(memberId, new Object[] {clientString, new - // Integer(connectionCount)}); - // /* Note: all client addresses are same... - // Iterator serverThreads = ((Set) entry.getValue()).iterator(); - // while (serverThreads.hasNext()) { - // ServerConnection connection = (ServerConnection) serverThreads.next(); - // InetAddress clientAddress = connection.getClientAddress(); - // logger.severe("getConnectedClients: memberId=" + memberId + - // " clientAddress=" + clientAddress + " FQDN=" + - // clientAddress.getCanonicalHostName()); - // }*/ - // } - // } - // return map; - // } - /** * Returns modifiable map (changes do not effect this class) of client membershipID to connection * count. This is different from the map contained in this class as here the key is client @@ -442,7 +402,6 @@ public class ClientHealthMonitor { * @param filterProxies Set identifying the Connection proxies which should be fetched. These * ConnectionProxies may be from same client member or different. If it is null this would * mean to fetch the Connections of all the ConnectionProxy objects. - * */ public Map getConnectedClients(Set filterProxies) { Map map = new HashMap(); // KEY=proxyID, VALUE=connectionCount (Integer) @@ -677,7 +636,6 @@ public class ClientHealthMonitor { return this._clientHeartbeats; } - /** * Shuts down the singleton <code>CacheClientNotifier</code> instance. */ @@ -693,10 +651,9 @@ public class ClientHealthMonitor { * * @param cache The GemFire <code>Cache</code> * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the - * client has died and interrupting its sockets. */ - protected static synchronized void createInstance(Cache cache, int maximumTimeBetweenPings, - CacheClientNotifierStats stats) { + protected static synchronized void createInstance(InternalCache cache, + int maximumTimeBetweenPings, CacheClientNotifierStats stats) { refCount++; if (_instance != null) { return; @@ -710,9 +667,8 @@ public class ClientHealthMonitor { * * @param cache The GemFire <code>Cache</code> * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the - * client has died and interrupting its sockets. */ - private ClientHealthMonitor(Cache cache, int maximumTimeBetweenPings, + private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings, CacheClientNotifierStats stats) { // Set the Cache this._cache = cache;
http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index ecd9c7a..6eadee3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.distributed.ConfigurationProperties.*; @@ -49,7 +48,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.ClientHandShake; @@ -373,7 +372,7 @@ public class ServerConnection implements Runnable { return getCache().getDistributedSystem(); } - public Cache getCache() { + public InternalCache getCache() { return this.crHelper.getCache(); } @@ -578,7 +577,7 @@ public class ServerConnection implements Runnable { private boolean isFiringMembershipEvents() { return this.acceptor.isRunning() - && !((GemFireCacheImpl) this.acceptor.getCachedRegionHelper().getCache()).isClosed() + && !(this.acceptor.getCachedRegionHelper().getCache()).isClosed() && !acceptor.getCachedRegionHelper().getCache().getCancelCriterion().isCancelInProgress(); } http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java index 5a02525..1b599e9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java @@ -12,16 +12,13 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; import org.apache.logging.log4j.Logger; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; import org.apache.geode.internal.cache.tier.sockets.Message; @@ -30,7 +27,6 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.TypeRegistry; - public class AddPdxEnum extends BaseCommand { private static final Logger logger = LogService.getLogger(); @@ -56,7 +52,7 @@ public class AddPdxEnum extends BaseCommand { int enumId = msg.getPart(1).getInt(); try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteEnum(enumId, enumInfo); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java index 9c01e05..9b8302e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java @@ -12,18 +12,14 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; import org.apache.logging.log4j.Logger; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; -import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; @@ -31,7 +27,6 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.TypeRegistry; - public class AddPdxType extends BaseCommand { private static final Logger logger = LogService.getLogger(); @@ -61,7 +56,7 @@ public class AddPdxType extends BaseCommand { // client side. type.setTypeId(typeId); try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteType(typeId, type); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java index 84e5bd0..e63ac22 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction.java @@ -26,9 +26,8 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultSender; import org.apache.geode.cache.operations.ExecuteFunctionOperationContext; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.control.HeapMemoryMonitor; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -155,7 +154,7 @@ public class ExecuteFunction extends BaseCommand { logger.debug("Executing Function on Server: " + servConn.toString() + "with context :" + context.toString()); } - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical() http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java index 102e8e8..8fafd10 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction65.java @@ -26,9 +26,8 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultSender; import org.apache.geode.cache.operations.ExecuteFunctionOperationContext; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.control.HeapMemoryMonitor; import org.apache.geode.internal.cache.control.InternalResourceManager; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -183,7 +182,7 @@ public class ExecuteFunction65 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Executing Function on Server: {} with context: {}", servConn, context); } - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical() http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java index 86b2466..d007777 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ExecuteFunction66.java @@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.TXStateProxy; import org.apache.geode.internal.cache.control.HeapMemoryMonitor; @@ -220,7 +221,7 @@ public class ExecuteFunction66 extends BaseCommand { if (logger.isDebugEnabled()) { logger.debug("Executing Function on Server: {} with context: {}", servConn, context); } - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor(); if (functionObject.optimizeForWrite() && cache != null && hmm.getState().isCritical() @@ -329,14 +330,14 @@ public class ExecuteFunction66 extends BaseCommand { .toString(fn.getId())); } } else { - /** + /* * if dm is null it mean cache is also null. Transactional function without cache cannot be * executed. */ final TXStateProxy txState = TXManagerImpl.getCurrentTXState(); Runnable functionExecution = new Runnable() { public void run() { - GemFireCacheImpl cache = null; + InternalCache cache = null; try { if (txState != null) { cache = GemFireCacheImpl.getExisting("executing function"); http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index 2ccca76..09f56f8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -12,9 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; @@ -31,12 +28,13 @@ import org.apache.geode.cache.wan.GatewayReceiver; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DistributionStats; import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.i18n.LogWriterI18n; +import org.apache.geode.i18n.StringId; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.EventIDHolder; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; @@ -58,7 +56,6 @@ import org.apache.geode.pdx.internal.EnumId; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.PeerTypeRegistration; -import org.apache.geode.i18n.StringId; public class GatewayReceiverCommand extends BaseCommand { @@ -71,8 +68,8 @@ public class GatewayReceiverCommand extends BaseCommand { private GatewayReceiverCommand() {} private void handleRegionNull(ServerConnection servConn, String regionName, int batchId) { - GemFireCacheImpl gfc = (GemFireCacheImpl) servConn.getCachedRegionHelper().getCache(); - if (gfc != null && gfc.isCacheAtShutdownAll()) { + InternalCache cache = servConn.getCachedRegionHelper().getCache(); + if (cache != null && cache.isCacheAtShutdownAll()) { throw new CacheClosedException("Shutdown occurred during message processing"); } else { String reason = LocalizedStrings.ProcessBatch_WAS_NOT_FOUND_DURING_BATCH_CREATE_REQUEST_0 @@ -808,12 +805,10 @@ public class GatewayReceiverCommand extends BaseCommand { if (key instanceof EnumId) { EnumId enumId = (EnumId) key; value = BlobHelper.deserializeBlob((byte[]) value); - ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteEnum(enumId.intValue(), - (EnumInfo) value); + crHelper.getCache().getPdxRegistry().addRemoteEnum(enumId.intValue(), (EnumInfo) value); } else { value = BlobHelper.deserializeBlob((byte[]) value); - ((GemFireCacheImpl) crHelper.getCache()).getPdxRegistry().addRemoteType((int) key, - (PdxType) value); + crHelper.getCache().getPdxRegistry().addRemoteType((int) key, (PdxType) value); } return true; } @@ -867,7 +862,6 @@ public class GatewayReceiverCommand extends BaseCommand { servConn.getName()), be); } } - } private static void writeFatalException(Message origMsg, Throwable exception, http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java index 72e375c..54a21ed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXEnumById.java @@ -12,14 +12,11 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.TypeRegistry; - public class GetPDXEnumById extends BaseCommand { private final static GetPDXEnumById singleton = new GetPDXEnumById(); @@ -51,7 +47,7 @@ public class GetPDXEnumById extends BaseCommand { EnumInfo result; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); result = registry.getEnumInfoById(enumId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java index 25bfe3d..1b21383 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForEnum.java @@ -12,14 +12,11 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.TypeRegistry; - public class GetPDXIdForEnum extends BaseCommand { private final static GetPDXIdForEnum singleton = new GetPDXIdForEnum(); @@ -52,7 +48,7 @@ public class GetPDXIdForEnum extends BaseCommand { int enumId; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); enumId = registry.defineEnum(enumInfo); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java index 3c80c76..2054196 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXIdForType.java @@ -12,14 +12,11 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.TypeRegistry; - public class GetPDXIdForType extends BaseCommand { private final static GetPDXIdForType singleton = new GetPDXIdForType(); @@ -53,7 +49,7 @@ public class GetPDXIdForType extends BaseCommand { int pdxId; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); pdxId = registry.defineType(type); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java index 603d3d0..2470893 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPDXTypeById.java @@ -12,14 +12,11 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -28,7 +25,6 @@ import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.TypeRegistry; - public class GetPDXTypeById extends BaseCommand { private final static GetPDXTypeById singleton = new GetPDXTypeById(); @@ -51,7 +47,7 @@ public class GetPDXTypeById extends BaseCommand { PdxType type; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); type = registry.getType(pdxId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java index ca3d559..19551c4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxEnums70.java @@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; import java.util.Map; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -46,7 +46,7 @@ public class GetPdxEnums70 extends BaseCommand { Map<Integer, EnumInfo> enums; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); enums = cache.getPdxRegistry().enumMap(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java index 8b73ed7..cc96b8e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GetPdxTypes70.java @@ -17,7 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; import java.util.Map; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -46,7 +46,7 @@ public class GetPdxTypes70 extends BaseCommand { Map<Integer, PdxType> types; try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); types = cache.getPdxRegistry().typeMap(); } catch (Exception e) { writeException(msg, e, false, servConn); http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java index 54235c1..d3c0393 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query.java @@ -12,9 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; @@ -29,7 +26,6 @@ import org.apache.geode.cache.query.SelectResults; import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.cache.query.internal.types.CollectionTypeImpl; import org.apache.geode.cache.query.types.CollectionType; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.sockets.BaseCommandQuery; import org.apache.geode.internal.cache.tier.sockets.Message; @@ -65,7 +61,6 @@ public class Query extends BaseCommandQuery { servConn.setRequestSpecificTimeout(timeout); } - if (logger.isDebugEnabled()) { logger.debug("{}: Received query request from {} queryString: {}", servConn.getName(), servConn.getSocketString(), queryString); @@ -73,7 +68,7 @@ public class Query extends BaseCommandQuery { try { // Create query QueryService queryService = - ((GemFireCacheImpl) servConn.getCachedRegionHelper().getCache()).getLocalQueryService(); + servConn.getCachedRegionHelper().getCache().getLocalQueryService(); org.apache.geode.cache.query.Query query = queryService.newQuery(queryString); Set regionNames = ((DefaultQuery) query).getRegionsInQuery(null); http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java index 4e30039..5849431 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Query651.java @@ -12,26 +12,26 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.geode.cache.operations.QueryOperationContext; +import org.apache.geode.cache.query.QueryInvalidException; +import org.apache.geode.cache.query.QueryService; import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.cache.query.internal.types.CollectionTypeImpl; import org.apache.geode.cache.query.types.CollectionType; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; -import org.apache.geode.internal.cache.tier.sockets.*; +import org.apache.geode.internal.cache.tier.sockets.BaseCommandQuery; +import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; +import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.cache.tier.sockets.ServerConnection; import org.apache.geode.internal.security.AuthorizeRequest; -import org.apache.geode.cache.operations.QueryOperationContext; -import org.apache.geode.cache.query.QueryService; -import org.apache.geode.cache.query.internal.DefaultQuery; -import org.apache.geode.cache.query.QueryInvalidException; -import java.io.IOException; -import java.util.List; -import java.util.Set; public class Query651 extends BaseCommandQuery { @@ -92,7 +92,7 @@ public class Query651 extends BaseCommandQuery { try { // Create query QueryService queryService = - ((GemFireCacheImpl) servConn.getCachedRegionHelper().getCache()).getLocalQueryService(); + servConn.getCachedRegionHelper().getCache().getLocalQueryService(); org.apache.geode.cache.query.Query query = null; if (queryParams != null) { @@ -138,6 +138,4 @@ public class Query651 extends BaseCommandQuery { protected CollectionType getCollectionType(SelectResults selectResults) { return new CollectionTypeImpl(List.class, selectResults.getCollectionType().getElementType()); } - - } http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java index a085353..3fd84d6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RequestEventValue.java @@ -16,10 +16,8 @@ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import org.apache.geode.LogWriter; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.ha.HAContainerWrapper; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java index e324a7f..72eab50 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java @@ -14,21 +14,26 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; +import java.io.IOException; + import org.apache.geode.cache.TransactionDataNodeHasDepartedException; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.WaitForViewInstallation; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.FindRemoteTXMessage; import org.apache.geode.internal.cache.FindRemoteTXMessage.FindRemoteTXMessageReplyProcessor; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PeerTXStateStub; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxy; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; -import java.io.IOException; - /** * Used for bootstrapping txState/PeerTXStateStub on the server. This command is send when in client * in a transaction is about to failover to this server @@ -91,7 +96,7 @@ public class TXFailoverCommand extends BaseCommand { // bug #42228 and bug #43504 - this cannot return until the current view // has been installed by all members, so that dlocks are released and // the same keys can be used in a new transaction by the same client thread - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); try { WaitForViewInstallation.send((DistributionManager) cache.getDistributionManager()); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java index 8a1f8b1..ded789e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java @@ -14,6 +14,14 @@ */ package org.apache.geode.internal.cache.tx; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import javax.transaction.Status; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; import org.apache.geode.cache.TransactionException; @@ -26,30 +34,22 @@ import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.TXCommitMessage; +import org.apache.geode.internal.cache.TXLockRequest; +import org.apache.geode.internal.cache.TXRegionLockRequestImpl; +import org.apache.geode.internal.cache.TXStateProxy; +import org.apache.geode.internal.cache.TXStateStub; import org.apache.geode.internal.cache.locks.TXRegionLockRequest; import org.apache.geode.internal.cache.tx.TransactionalOperation.ServerRegionOperation; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -import org.apache.logging.log4j.Logger; - -import javax.transaction.Status; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; public class ClientTXStateStub extends TXStateStub { private static final Logger logger = LogService.getLogger(); - // /** a flag to turn off automatic replay of transactions. Maybe this should be a pool property? - // */ - // private static final boolean ENABLE_REPLAY = - // Boolean.getBoolean("gemfire.enable-transaction-replay"); - // - // /** time to pause between transaction replays, in millis */ - // private static final int TRANSACTION_REPLAY_PAUSE = - // Integer.getInteger("gemfire.transaction-replay-pause", 500).intValue(); - /** test hook - used to find out what operations were performed in the last tx */ private static ThreadLocal<List<TransactionalOperation>> recordedTransactionalOperations = null; @@ -91,8 +91,6 @@ public class ClientTXStateStub extends TXStateStub { recordedTransactionalOperations = t; } - - public ClientTXStateStub(TXStateProxy stateProxy, DistributedMember target, LocalRegion firstRegion) { super(stateProxy, target); @@ -124,7 +122,7 @@ public class ClientTXStateStub extends TXStateStub { */ private void obtainLocalLocks() { lockReq = new TXLockRequest(); - GemFireCacheImpl cache = GemFireCacheImpl.getExisting(""); + InternalCache cache = GemFireCacheImpl.getExisting(""); for (TransactionalOperation txOp : this.recordedOperations) { if (ServerRegionOperation.lockKeyForTx(txOp.getOperation())) { TXRegionLockRequest rlr = lockReq.getRegionLockRequest(txOp.getRegionName()); @@ -160,7 +158,7 @@ public class ClientTXStateStub extends TXStateStub { this.internalAfterSendCommit.run(); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { // fixes bug 42933 return; @@ -177,7 +175,6 @@ public class ClientTXStateStub extends TXStateStub { txcm.basicProcess(); } - @Override protected TXRegionStub generateRegionStub(LocalRegion region) { return new ClientTXRegionStub(region); http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java index 1aed187..6df6eb9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistTxEntryEvent.java @@ -30,16 +30,13 @@ import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllEn import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList; import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.offheap.annotations.Retained; -/** - * - * - */ public class DistTxEntryEvent extends EntryEventImpl { protected static final byte HAS_PUTALL_OP = 0x1; @@ -100,7 +97,7 @@ public class DistTxEntryEvent extends EntryEventImpl { public void fromData(DataInput in) throws IOException, ClassNotFoundException { this.eventID = (EventID) DataSerializer.readObject(in); String regionName = DataSerializer.readString(in); - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); this.region = (LocalRegion) cache.getRegion(regionName); this.op = Operation.fromOrdinal(in.readByte()); Object key = DataSerializer.readObject(in); http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 832391d..fd128c3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -24,17 +24,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.geode.InternalGemFireError; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.execute.BucketMovedException; -import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; +import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; @@ -61,13 +56,16 @@ import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.internal.cache.CachePerfStats; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.HasCachePerfStats; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.execute.BucketMovedException; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator; import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor; import org.apache.geode.internal.cache.xmlcache.CacheCreation; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -520,7 +518,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi this.getLifeCycleLock().writeLock().lock(); // first, check if this sender is attached to any region. If so, throw // GatewaySenderException - Set<LocalRegion> regions = ((GemFireCacheImpl) this.cache).getApplicationRegions(); + Set<LocalRegion> regions = this.cache.getApplicationRegions(); Iterator regionItr = regions.iterator(); while (regionItr.hasNext()) { LocalRegion region = (LocalRegion) regionItr.next(); @@ -541,7 +539,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } // remove the sender from the cache - ((GemFireCacheImpl) this.cache).removeGatewaySender(this); + this.cache.removeGatewaySender(this); // destroy the region underneath the sender's queue if (initiator) { @@ -816,7 +814,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return true; } - public void distribute(EnumListenerEvent operation, EntryEventImpl event, List<Integer> allRemoteDSIds) { @@ -981,7 +978,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } } - /** * During sender is getting started, if there are any cache operation on queue then that event * will be stored in temp queue. Once sender is started, these event from tmp queue will be added @@ -1100,8 +1096,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi boolean gotLock = false; try { // Obtain the distributed lock - gotLock = ((GemFireCacheImpl) getCache()).getGatewaySenderLockService() - .lock(META_DATA_REGION_NAME, -1, -1); + gotLock = getCache().getGatewaySenderLockService().lock(META_DATA_REGION_NAME, -1, -1); if (!gotLock) { throw new IllegalStateException( LocalizedStrings.AbstractGatewaySender_FAILED_TO_LOCK_META_REGION_0 @@ -1143,7 +1138,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } finally { // Unlock the lock if necessary if (gotLock) { - ((GemFireCacheImpl) getCache()).getGatewaySenderLockService().unlock(META_DATA_REGION_NAME); + getCache().getGatewaySenderLockService().unlock(META_DATA_REGION_NAME); if (isDebugEnabled) { logger.debug("{}: Unlocked the metadata region", this); } @@ -1161,7 +1156,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi @SuppressWarnings({"rawtypes", "unchecked", "deprecation"}) private static synchronized Region<String, Integer> initializeEventIdIndexMetaDataRegion( AbstractGatewaySender sender) { - final Cache cache = sender.getCache(); + final InternalCache cache = sender.getCache(); Region<String, Integer> region = cache.getRegion(META_DATA_REGION_NAME); if (region == null) { // Create region attributes (must be done this way to use InternalRegionArguments) @@ -1183,7 +1178,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // Create the region try { - region = ((GemFireCacheImpl) cache).createVMRegion(META_DATA_REGION_NAME, ra, ira); + region = cache.createVMRegion(META_DATA_REGION_NAME, ra, ira); } catch (RegionExistsException e) { region = cache.getRegion(META_DATA_REGION_NAME); } catch (Exception e) { @@ -1216,7 +1211,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } } - public int getTmpQueuedEventSize() { if (tmpQueuedEvents != null) { return tmpQueuedEvents.size(); @@ -1300,8 +1294,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi * allows us to defer creation of the GatewaySenderEventImpl until we are ready to actually * enqueue it. The caller is responsible for giving us an EntryEventImpl that we own and that we * will release. This is done by making a copy/clone of the original event. This fixes bug 52029. - * - * */ public static class TmpQueueEvent implements Releasable { private final EnumListenerEvent operation; http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index dfbd664..1572e37 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -14,15 +14,42 @@ */ package org.apache.geode.internal.cache.wan; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.Conflatable; +import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EnumListenerEvent; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue; @@ -31,13 +58,6 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.pdx.internal.PeerTypeRegistration; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; /** * EventProcessor responsible for peeking from queue and handling over the events to the dispatcher. @@ -46,7 +66,6 @@ import java.util.concurrent.ConcurrentHashMap; * GatewaySenderEventRemoteDispatcher or GatewaySenderEventCallbackDispatcher. * * @since GemFire 7.0 - * */ public abstract class AbstractGatewaySenderEventProcessor extends Thread { @@ -391,7 +410,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // list of filteredList + pdxEventsToBeDispatched events List<GatewaySenderEventImpl> eventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>(); - for (;;) { if (stopped()) { break; @@ -426,7 +444,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // filtering VERSION_ACTION events from being sent. boolean sendUpdateVersionEvents = shouldSendVersionEvents(this.dispatcher); - // sleep a little bit, look for events boolean interrupted = Thread.interrupted(); try { @@ -435,7 +452,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { resetLastPeekedEvents = false; } - { // Below code was added to consider the case of queue region is // destroyed due to userPRs localdestroy or destroy operation. @@ -761,7 +777,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>(); // getPDXRegion - GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache(); + InternalCache cache = this.sender.getCache(); Region<Object, Object> pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME); if (rebuildPdxList) { @@ -782,7 +798,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { EntryEventImpl event = EntryEventImpl.create((LocalRegion) pdxRegion, Operation.UPDATE, typeEntry.getKey(), typeEntry.getValue(), null, false, cache.getMyId()); event.disallowOffHeapValues(); - event.setEventId(new EventID(cache.getSystem())); + event.setEventId(new EventID(cache.getInternalDistributedSystem())); List<Integer> allRemoteDSIds = new ArrayList<Integer>(); for (GatewaySender sender : cache.getGatewaySenders()) { allRemoteDSIds.add(sender.getRemoteDSId()); @@ -805,7 +821,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } } - Iterator<GatewaySenderEventImpl> iterator = pdxSenderEventsList.iterator(); while (iterator.hasNext()) { GatewaySenderEventImpl pdxEvent = iterator.next(); @@ -838,7 +853,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { * @param remotePdxSize */ public void checkIfPdxNeedsResend(int remotePdxSize) { - GemFireCacheImpl cache = (GemFireCacheImpl) this.sender.getCache(); + InternalCache cache = this.sender.getCache(); Region<Object, Object> pdxRegion = cache.getRegion(PeerTypeRegistration.REGION_NAME); // The peer has not seen all of our PDX types. This may be because @@ -976,7 +991,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } eventQueueRemove(events.size()); } - } public void handleUnSuccessBatchAck(int bId) { @@ -1014,7 +1028,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } } - public abstract void initializeEventDispatcher(); public GatewaySenderEventDispatcher getDispatcher() { @@ -1248,11 +1261,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId); } - /* - * public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException { return - * ((ParallelGatewaySenderQueue)this.queue).size(pr, bucketId); } - */ - public void notifyEventProcessorIfRequired(int bucketId) { ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired(); } http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java index c831b26..9472792 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java @@ -28,8 +28,8 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.cache.util.Gateway; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.cache.wan.GatewayTransportFilter; import org.apache.geode.distributed.DistributedLockService; @@ -44,6 +44,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.UpdateAttributesProcessor; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -148,12 +149,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { .toString(new Object[] {sp.Id, sp.manualStart, sender.isManualStart()})); } } - /* - * if(sp.dispatcherThreads != sender.getDispatcherThreads()) { throw new IllegalStateException( - * LocalizedStrings. - * GatewaySenderAdvisor_CANNOT_CREATE_GATEWAYSENDER_0_WITH_DISPATCHER_THREAD_1_BECAUSE_ANOTHER_CACHE_HAS_THE_SAME_SENDER_WITH_DISPATCHER_THREAD_2 - * .toString(new Object[] { sp.Id, sp.dispatcherThreads, sender.getDispatcherThreads() })); } - */ if (!sp.isParallel) { if (sp.orderPolicy != sender.getOrderPolicy()) { @@ -232,9 +227,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { * if there are no other primary senders then this sender should volunteer for primary. 2. If this * sender is primary and its policy is secondary then this sender should release the lock so that * other primary sender which s waiting on lock will get the lock. - * */ - @Override public void profileUpdated(Profile profile) { if (profile instanceof GatewaySenderProfile) { @@ -299,8 +292,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { } public void initDLockService() { - InternalDistributedSystem ds = - ((GemFireCacheImpl) this.sender.getCache()).getInternalDistributedSystem(); + InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem(); String dlsName = getDLockServiceName(); this.lockService = DistributedLockService.getServiceNamed(dlsName); if (this.lockService == null) { @@ -560,8 +552,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { this.isDiskSynchronous = in.readBoolean(); this.dispatcherThreads = in.readInt(); if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.CURRENT) < 0) { - org.apache.geode.cache.util.Gateway.OrderPolicy oldOrderPolicy = - DataSerializer.readObject(in); + Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in); if (oldOrderPolicy != null) { if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) { this.orderPolicy = OrderPolicy.KEY; @@ -604,14 +595,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.CURRENT) < 0 && this.orderPolicy != null) { String orderPolicyName = this.orderPolicy.name(); - if (orderPolicyName.equals(org.apache.geode.cache.util.Gateway.OrderPolicy.KEY.name())) { - DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.KEY, out); - } else if (orderPolicyName - .equals(org.apache.geode.cache.util.Gateway.OrderPolicy.THREAD.name())) { - DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.THREAD, out); + if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) { + DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out); + } else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) { + DataSerializer.writeObject(Gateway.OrderPolicy.THREAD, out); } else { - DataSerializer.writeObject(org.apache.geode.cache.util.Gateway.OrderPolicy.PARTITION, - out); + DataSerializer.writeObject(Gateway.OrderPolicy.PARTITION, out); } } else { DataSerializer.writeObject(orderPolicy, out); @@ -699,10 +688,9 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { @Override public void processIncoming(DistributionManager dm, String adviseePath, boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) { - Cache cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { - AbstractGatewaySender sender = - (AbstractGatewaySender) ((GemFireCacheImpl) cache).getGatewaySender(adviseePath); + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(adviseePath); handleDistributionAdvisee(sender, removeProfile, exchangeProfiles, replyProfiles); } } @@ -714,7 +702,6 @@ public class GatewaySenderAdvisor extends DistributionAdvisor { sb.append("; remoteDSName=" + this.remoteDSId); sb.append("; isRunning=" + this.isRunning); sb.append("; isPrimary=" + this.isPrimary); - } } http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index ffe7ae0..ed6df0b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -34,15 +34,12 @@ import org.apache.geode.InternalGemFireException; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.Region; -import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; -import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -58,8 +55,6 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage; * * The {@link ParallelGatewaySenderQueue} should be shared among all the * {@link ParallelGatewaySenderEventProcessor}s. - * - * */ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor { @@ -67,8 +62,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor protected static final Logger logger = LogService.getLogger(); protected ParallelGatewaySenderEventProcessor processors[]; - // private final List<ConcurrentParallelGatewaySenderQueue> concurrentParallelQueues; + private GemFireException ex = null; + final int nDispatcher; public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender sender) { @@ -94,8 +90,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor // gets the remaining // bucket Set<Region> targetRs = new HashSet<Region>(); - for (LocalRegion pr : ((GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache()) - .getApplicationRegions()) { + for (LocalRegion pr : sender.getCache().getApplicationRegions()) { if (pr.getAllGatewaySenderIds().contains(sender.getId())) { targetRs.add(pr); } @@ -124,17 +119,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor @Override protected void initializeMessageQueue(String id) { - /* - * Set<Region> targetRs = new HashSet<Region>(); for (LocalRegion pr : - * ((GemFireCacheImpl)((ParallelGatewaySenderImpl)sender) .getCache()).getApplicationRegions()) - * { if (pr.getAllGatewaySenderIds().contains(id)) { targetRs.add(pr); } } - */ - // this.parallelQueue = new ParallelGatewaySenderQueue(this.sender, targetRs); - /* - * if (sender.getIsHDFSQueue()) this.parallelQueue = new - * HDFSParallelGatewaySenderQueue(this.sender, targetRs); else this.parallelQueue = new - * ParallelGatewaySenderQueue(this.sender, targetRs); - */ + // nothing } @Override @@ -148,14 +133,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor } int pId = bucketId % this.nDispatcher; this.processors[pId].enqueueEvent(operation, event, substituteValue); - - /* - * if (getSender().beforeEnqueue(gatewayQueueEvent)) { long start = - * getSender().getStatistics().startTime(); try { this.parallelQueue.put(gatewayQueueEvent); } - * catch (InterruptedException e) { e.printStackTrace(); } finally { if (gatewayQueueEvent != - * null) { gatewayQueueEvent.release(); } getSender().getStatistics().endPut(start); } else { - * getSender().getStatistics().incEventsFiltered(); } - */ } @Override @@ -196,7 +173,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor } } - private void waitForRunningStatus() { for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { synchronized (parallelProcessor.runningStateLock) { @@ -218,7 +194,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor } } - @Override public void stopProcessing() { if (!this.isAlive()) { @@ -299,7 +274,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { parallelProcessor.waitForDispatcherToPause(); } - // super.waitForDispatcherToPause(); } @Override @@ -330,24 +304,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor } return l; } - /* - * public List<ConcurrentParallelGatewaySenderQueue> getConcurrentParallelQueues() { return - * concurrentParallelQueues; } - */ @Override public RegionQueue getQueue() { return this.queue; } - /* - * public Set<PartitionedRegion> getRegions() { return - * ((ParallelGatewaySenderQueue)(processors[0].getQueue())).getRegions(); } - * - * public int localSize() { return - * ((ParallelGatewaySenderQueue)(processors[0].getQueue())).localSize(); } - */ - @Override public GatewaySenderEventDispatcher getDispatcher() { return this.processors[0].getDispatcher();// Suranjan is that fine?? http://git-wip-us.apache.org/repos/asf/geode/blob/bccdb56e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index faf7836..e74270f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -12,47 +12,32 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.wan.parallel; import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.BlockingQueue; import org.apache.logging.log4j.Logger; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.Region; -import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.ForceReattemptException; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackDispatcher; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.size.SingleObjectSizer; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.BlockingQueue; - - -/** - * - */ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor { private static final Logger logger = LogService.getLogger(); @@ -80,7 +65,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv "Event Processor for GatewaySender_" + sender.getId() + "_" + id, sender); this.index = id; this.nDispatcher = nDispatcher; - // this.queue = new ParallelGatewaySenderQueue(sender, userRegions, id, nDispatcher); initializeMessageQueue(sender.getId()); setDaemon(true); } @@ -88,8 +72,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv @Override protected void initializeMessageQueue(String id) { Set<Region> targetRs = new HashSet<Region>(); - for (LocalRegion region : ((GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache()) - .getApplicationRegions()) { + for (LocalRegion region : sender.getCache().getApplicationRegions()) { if (region.getAllGatewaySenderIds().contains(id)) { targetRs.add(region); } @@ -128,12 +111,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv return; } - // TODO : Kishor : Looks like for PDX region bucket id is set to -1. - // int bucketId = -1; - // if (!(region instanceof DistributedRegion && ((DistributedRegion)region) - // .isPdxTypesRegion())) { - // bucketId = PartitionedRegionHelper.getHashKey(event); - // } + // TODO: Looks like for PDX region bucket id is set to -1. boolean queuedEvent = false; try { EventID eventID = ((EntryEventImpl) event).getEventId(); @@ -143,7 +121,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID()); - if (getSender().beforeEnqueue(gatewayQueueEvent)) { long start = getSender().getStatistics().startTime(); try { @@ -170,11 +147,6 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId); } - /* - * public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException { return - * ((ParallelGatewaySenderQueue)this.queue).size(pr, bucketId); } - */ - public void notifyEventProcessorIfRequired(int bucketId) { ((ParallelGatewaySenderQueue) this.queue).notifyEventProcessorIfRequired(); } @@ -196,19 +168,16 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv } public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) { - // TODO Auto-generated method stub ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserPR(pr); } public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) { - // TODO Auto-generated method stub ((ParallelGatewaySenderQueue) this.queue).addShadowPartitionedRegionForUserRR(userRegion); } @Override protected void rebalance() { // No operation for AsyncEventQueuerProcessor - } @Override
