This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c19fda5 GEODE-5135: Refactor AbstractRegionMap dependencies (#3235)
c19fda5 is described below
commit c19fda5a05c7ce0b064e5def8e65b86804ab76a2
Author: Axel Boldt-Christmas <[email protected]>
AuthorDate: Fri Mar 1 18:09:08 2019 +0100
GEODE-5135: Refactor AbstractRegionMap dependencies (#3235)
`BaseRegionMap` will be an abstract base class for `AbstractRegionMap`
and `ProxyRegionMap`. Will move common functionality into this class.
Aims to remove dependency between `AbstractRegionMap` and `ProxyRegionMap`
Move ARMLockTestHook to RegionMap to remove circular dependency
while keeping the impact as small as possible.
Move `forceInvalidateEvent`, `shouldInvokeCallbacks` and
`switchEventOwnerAndOriginRemote` to `BaseRegionMap` to reduce
dependency between `ProxyRegionMap` and `AbstractRegionMap`
Move `createCallbackEvent` from `AbstractRegionMap` to `EntryEventImpl`
This reduces dependecy between `AbstractRegionMap` and `ProxyRegionMap`
Add `EntryEventFactory` class and move `createCallbackEvent`.
Also cleanup `BaseRegionMap` prefix refactoring artifact
Change `EntryEventFactory` to `EntryEventFactoryImpl` that
implements new `EntryEventFactory` interface to make it
availiable for Mockito testing. This requires `createCallbackEvent`
to be a non-static method.
Add private EntryEventFactory field in classes where it is used.
Reduce number of object instantiations.
Co-Authored-By: Patric Lantz <[email protected]>
Co-Authored-By: Sayyed Ali Kiaian Mousavy <[email protected]>
Co-Authored-By: Nicole Jagelid <[email protected]>
Co-Authored-By: ddahlgren95 <[email protected]>
---
.../geode/internal/cache/AbstractRegionMap.java | 301 ++-------------------
.../apache/geode/internal/cache/BaseRegionMap.java | 156 +++++++++++
.../geode/internal/cache/DistributedRegion.java | 2 +-
.../geode/internal/cache/EntryEventFactory.java | 36 +++
.../internal/cache/EntryEventFactoryImpl.java | 146 ++++++++++
.../apache/geode/internal/cache/LocalRegion.java | 2 +-
.../geode/internal/cache/ProxyRegionMap.java | 112 ++------
.../org/apache/geode/internal/cache/RegionMap.java | 22 +-
.../geode/internal/cache/TXCommitMessage.java | 7 +-
9 files changed, 413 insertions(+), 371 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 5789949..5ccc2fc 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -28,7 +28,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.annotations.internal.MutableForTesting;
-import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
@@ -40,19 +39,14 @@ import
org.apache.geode.cache.query.IndexMaintenanceException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DiskInitFile.DiskRegionFlag;
-import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.entries.AbstractOplogDiskRegionEntry;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
-import org.apache.geode.internal.cache.eviction.EvictableEntry;
-import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
import org.apache.geode.internal.cache.map.RegionMapCommitPut;
@@ -85,9 +79,10 @@ import
org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
*
* @since GemFire 3.5.1
*/
-public abstract class AbstractRegionMap
- implements RegionMap, FocusedRegionMap, CacheModificationLock {
+public abstract class AbstractRegionMap extends BaseRegionMap
+ implements FocusedRegionMap, CacheModificationLock {
private static final Logger logger = LogService.getLogger();
+ private final EntryEventFactory entryEventFactory = new
EntryEventFactoryImpl();
/** The underlying map for this region. */
protected ConcurrentMapWithReusableEntries<Object, Object> map;
@@ -275,7 +270,6 @@ public abstract class AbstractRegionMap
return (RegionEntry) getEntryMap().get(key);
}
-
@Override
public RegionEntry putEntryIfAbsent(Object key, RegionEntry regionEntry) {
RegionEntry oldRe = (RegionEntry) getEntryMap().putIfAbsent(key,
regionEntry);
@@ -477,31 +471,10 @@ public abstract class AbstractRegionMap
return result;
}
- @Override
- public void lruUpdateCallback() {
- // By default do nothing; LRU maps needs to override this method
- }
-
public void lruUpdateCallback(boolean b) {
// By default do nothing; LRU maps needs to override this method
}
- @Override
- public boolean disableLruUpdateCallback() {
- // By default do nothing; LRU maps needs to override this method
- return false;
- }
-
- @Override
- public void enableLruUpdateCallback() {
- // By default do nothing; LRU maps needs to override this method
- }
-
- @Override
- public void resetThreadLocals() {
- // By default do nothing; LRU maps needs to override this method
- }
-
/**
* Tell an LRU that a new entry has been created
*/
@@ -535,21 +508,6 @@ public abstract class AbstractRegionMap
e.decRefCount(null, lr);
}
- @Override
- public boolean lruLimitExceeded(DiskRegionView diskRegionView) {
- return false;
- }
-
- @Override
- public void lruCloseStats() {
- // do nothing by default
- }
-
- @Override
- public void lruEntryFaultIn(EvictableEntry entry) {
- // do nothing by default
- }
-
/**
* Process an incoming version tag for concurrent operation detection. This
must be done before
* modifying the region entry.
@@ -1058,9 +1016,10 @@ public abstract class AbstractRegionMap
// a receipt of a TXCommitMessage AND there are callbacks
installed
// for this region
@Released
- final EntryEventImpl callbackEvent = createCallbackEvent(owner,
op, key, null, txId,
- txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
- txEntryState, versionTag, tailKey);
+ final EntryEventImpl callbackEvent = entryEventFactory
+ .createCallbackEvent(owner, op, key, null, txId,
+ txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
+ txEntryState, versionTag, tailKey);
try {
if (owner.isUsedForPartitionedRegionBucket()) {
@@ -1155,9 +1114,11 @@ public abstract class AbstractRegionMap
} else {
try {
boolean invokeCallbacks = shouldInvokeCallbacks(owner,
isRegionReady || inRI);
- callbackEvent = createCallbackEvent(owner, op, key, null,
txId, txEvent,
- eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState,
- versionTag, tailKey);
+ callbackEvent = entryEventFactory
+ .createCallbackEvent(owner, op, key, null, txId,
txEvent,
+ eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
+ txEntryState,
+ versionTag, tailKey);
try {
callbackEvent.setRegionEntry(oldRe);
callbackEvent.setOldValue(Token.NOT_AVAILABLE);
@@ -1218,9 +1179,10 @@ public abstract class AbstractRegionMap
if (!opCompleted) {
opCompleted = true;
boolean invokeCallbacks = shouldInvokeCallbacks(owner,
isRegionReady || inRI);
- callbackEvent = createCallbackEvent(owner, op, key, null, txId,
txEvent, eventId,
- aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag,
- tailKey);
+ callbackEvent = entryEventFactory
+ .createCallbackEvent(owner, op, key, null, txId, txEvent,
eventId,
+ aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag,
+ tailKey);
try {
callbackEvent.setRegionEntry(newRe);
callbackEvent.setOldValue(Token.NOT_AVAILABLE);
@@ -1275,8 +1237,10 @@ public abstract class AbstractRegionMap
// Notify clients with client events.
@Released
EntryEventImpl callbackEvent =
- createCallbackEvent(owner, op, key, null, txId, txEvent, eventId,
aCallbackArgument,
- filterRoutingInfo, bridgeContext, txEntryState, versionTag,
tailKey);
+ entryEventFactory
+ .createCallbackEvent(owner, op, key, null, txId, txEvent,
eventId,
+ aCallbackArgument,
+ filterRoutingInfo, bridgeContext, txEntryState,
versionTag, tailKey);
try {
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
@@ -1303,29 +1267,6 @@ public abstract class AbstractRegionMap
event.release();
}
- /**
- * If true then invalidates that throw EntryNotFoundException or that are
already invalid will
- * first call afterInvalidate on CacheListeners. The old value on the event
passed to
- * afterInvalidate will be null. If the region is not initialized then
callbacks will not be done.
- * This property only applies to non-transactional invalidates.
Transactional invalidates ignore
- * this property. Note that empty "proxy" regions on a client will not be
sent invalidates from
- * the server unless they also set the proxy InterestPolicy to ALL. If the
invalidate is not sent
- * then this property will not cause a listener on that client to be
notified of the invalidate. A
- * non-empty "caching-proxy" will receive invalidates from the server.
- */
- @MutableForTesting
- public static boolean FORCE_INVALIDATE_EVENT =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"FORCE_INVALIDATE_EVENT");
-
- /**
- * If the FORCE_INVALIDATE_EVENT flag is true then invoke callbacks on the
given event.
- */
- static void forceInvalidateEvent(EntryEventImpl event, LocalRegion owner) {
- if (FORCE_INVALIDATE_EVENT) {
- event.invokeCallbacks(owner, false, false);
- }
- }
-
@Override
public boolean invalidate(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry,
boolean forceCallbacks) throws EntryNotFoundException {
@@ -1871,7 +1812,7 @@ public abstract class AbstractRegionMap
// for this region
boolean invokeCallbacks = shouldInvokeCallbacks(owner,
owner.isInitialized());
boolean callbackEventInPending = false;
- callbackEvent = createCallbackEvent(owner,
+ callbackEvent = entryEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE :
Operation.INVALIDATE, key, newValue,
txId, txEvent, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext,
txEntryState, versionTag, tailKey);
@@ -1927,7 +1868,7 @@ public abstract class AbstractRegionMap
if (!opCompleted) {
boolean invokeCallbacks = shouldInvokeCallbacks(owner,
owner.isInitialized());
boolean callbackEventInPending = false;
- callbackEvent = createCallbackEvent(owner,
+ callbackEvent = entryEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
txEntryState, versionTag, tailKey);
@@ -1985,7 +1926,7 @@ public abstract class AbstractRegionMap
// for this region
boolean invokeCallbacks = shouldInvokeCallbacks(owner,
owner.isInitialized());
boolean callbackEventInPending = false;
- callbackEvent = createCallbackEvent(owner,
+ callbackEvent = entryEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
txEntryState, versionTag, tailKey);
@@ -2037,7 +1978,7 @@ public abstract class AbstractRegionMap
// provider, thus causing region entry to be absent.
// Notify clients with client events.
boolean callbackEventInPending = false;
- callbackEvent = createCallbackEvent(owner,
+ callbackEvent = entryEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState,
versionTag, tailKey);
@@ -2189,21 +2130,6 @@ public abstract class AbstractRegionMap
}
/**
- * Switch the event's region from BucketRegion to owning PR and set
originRemote to the given
- * value
- */
- static EntryEventImpl switchEventOwnerAndOriginRemote(EntryEventImpl event,
- boolean originRemote) {
- assert event != null;
- if (event.getRegion().isUsedForPartitionedRegionBucket()) {
- LocalRegion pr = event.getRegion().getPartitionedRegion();
- event.setRegion(pr);
- }
- event.setOriginRemote(originRemote);
- return event;
- }
-
- /**
* Removing the existing indexed value requires the current value in the
cache, that is the one
* prior to applying the operation.
*
@@ -2232,135 +2158,14 @@ public abstract class AbstractRegionMap
}
}
- static boolean shouldInvokeCallbacks(final LocalRegion owner, final boolean
isInitialized) {
- LocalRegion lr = owner;
- boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
-
- if (isPartitioned) {
- /*
- * if(!((BucketRegion)lr).getBucketAdvisor().isPrimary()) {
- * if(!BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION) { return false; } }
- */
- lr = owner.getPartitionedRegion();
- }
- return (isPartitioned || isInitialized) &&
(lr.shouldDispatchListenerEvent()
- || lr.shouldNotifyBridgeClients() || lr.getConcurrencyChecksEnabled());
- }
-
EntryEventImpl createTransactionCallbackEvent(final LocalRegion re,
Operation op, Object key,
Object newValue, TransactionId txId, TXRmtEvent txEvent, EventID eventId,
Object aCallbackArgument, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState,
VersionTag versionTag,
long tailKey) {
- return createCallbackEvent(re, op, key, newValue, txId, txEvent, eventId,
aCallbackArgument,
- filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
- }
-
- /** create a callback event for applying a transactional change to the local
cache */
- @Retained
- public static EntryEventImpl createCallbackEvent(final InternalRegion
internalRegion,
- Operation op, Object key, Object newValue, TransactionId txId,
TXRmtEvent txEvent,
- EventID eventId, Object aCallbackArgument, FilterRoutingInfo
filterRoutingInfo,
- ClientProxyMembershipID bridgeContext, TXEntryState txEntryState,
VersionTag versionTag,
- long tailKey) {
- DistributedMember originator = null;
- // txId should not be null even on localOrigin
- Assert.assertTrue(txId != null);
- originator = txId.getMemberId();
-
- InternalRegion eventRegion = internalRegion;
- if (eventRegion.isUsedForPartitionedRegionBucket()) {
- eventRegion = internalRegion.getPartitionedRegion();
- }
-
- @Retained
- EntryEventImpl retVal = EntryEventImpl.create(internalRegion, op, key,
newValue,
- aCallbackArgument, txEntryState == null, originator);
- boolean returnedRetVal = false;
- try {
-
-
- if (bridgeContext != null) {
- retVal.setContext(bridgeContext);
- }
-
- if (eventRegion.generateEventID()) {
- retVal.setEventId(eventId);
- }
-
- if (versionTag != null) {
- retVal.setVersionTag(versionTag);
- }
-
- retVal.setTailKey(tailKey);
-
- FilterInfo localRouting = null;
- boolean computeFilterInfo = false;
- if (filterRoutingInfo == null) {
- computeFilterInfo = true;
- } else {
- localRouting = filterRoutingInfo.getLocalFilterInfo();
- if (localRouting != null) {
- // routing was computed in this VM but may need to perform local
interest processing
- computeFilterInfo =
!filterRoutingInfo.hasLocalInterestBeenComputed();
- } else {
- // routing was computed elsewhere and is in the "remote" routing
table
- localRouting =
filterRoutingInfo.getFilterInfo(internalRegion.getMyId());
- }
- if (localRouting != null) {
- if (!computeFilterInfo) {
- retVal.setLocalFilterInfo(localRouting);
- }
- } else {
- computeFilterInfo = true;
- }
- }
- if (logger.isTraceEnabled()) {
- logger.trace("createCBEvent filterRouting={} computeFilterInfo={}
local routing={}",
- filterRoutingInfo, computeFilterInfo, localRouting);
- }
-
- if (internalRegion.isUsedForPartitionedRegionBucket()) {
- BucketRegion bucket = (BucketRegion) internalRegion;
- if (BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION
- || bucket.getBucketAdvisor().isPrimary()) {
- retVal.setInvokePRCallbacks(true);
- } else {
- retVal.setInvokePRCallbacks(false);
- }
-
- if (computeFilterInfo) {
- if (bucket.getBucketAdvisor().isPrimary()) {
- if (logger.isTraceEnabled()) {
- logger.trace("createCBEvent computing routing for primary
bucket");
- }
- FilterProfile fp =
- ((BucketRegion)
internalRegion).getPartitionedRegion().getFilterProfile();
- if (fp != null) {
- FilterRoutingInfo fri =
fp.getFilterRoutingInfoPart2(filterRoutingInfo, retVal);
- if (fri != null) {
- retVal.setLocalFilterInfo(fri.getLocalFilterInfo());
- }
- }
- }
- }
- } else if (computeFilterInfo) { // not a bucket
- if (logger.isTraceEnabled()) {
- logger.trace("createCBEvent computing routing for non-bucket");
- }
- FilterProfile fp = internalRegion.getFilterProfile();
- if (fp != null) {
- retVal.setLocalFilterInfo(fp.getLocalFilterRouting(retVal));
- }
- }
- retVal.setTransactionId(txId);
- returnedRetVal = true;
- return retVal;
- } finally {
- if (!returnedRetVal) {
- retVal.release();
- }
- }
+ return entryEventFactory
+ .createCallbackEvent(re, op, key, newValue, txId, txEvent, eventId,
aCallbackArgument,
+ filterRoutingInfo, bridgeContext, txEntryState, versionTag,
tailKey);
}
@Override
@@ -2616,62 +2421,10 @@ public abstract class AbstractRegionMap
}
@Override
- public long getEvictions() {
- return 0;
- }
-
- @Override
- public void incRecentlyUsed() {
- // nothing by default
- }
-
- @Override
- public EvictionController getEvictionController() {
- return null;
- }
-
- @Override
public int getEntryOverhead() {
return (int)
ReflectionSingleObjectSizer.sizeof(getEntryFactory().getEntryClass());
}
- @Override
- public boolean beginChangeValueForm(EvictableEntry le,
- CachedDeserializable vmCachedDeserializable, Object v) {
- return false;
- }
-
- @Override
- public void finishChangeValueForm() {}
-
- @Override
- public int centralizedLruUpdateCallback() {
- return 0;
- }
-
- @Override
- public void updateEvictionCounter() {}
-
- public interface ARMLockTestHook {
- void beforeBulkLock(InternalRegion region);
-
- void afterBulkLock(InternalRegion region);
-
- void beforeBulkRelease(InternalRegion region);
-
- void afterBulkRelease(InternalRegion region);
-
- void beforeLock(InternalRegion region, CacheEvent event);
-
- void afterLock(InternalRegion region, CacheEvent event);
-
- void beforeRelease(InternalRegion region, CacheEvent event);
-
- void afterRelease(InternalRegion region, CacheEvent event);
-
- void beforeStateFlushWait();
- }
-
private ARMLockTestHook armLockTestHook;
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BaseRegionMap.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BaseRegionMap.java
new file mode 100644
index 0000000..9893f80
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BaseRegionMap.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.internal.MutableForTesting;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.eviction.EvictableEntry;
+import org.apache.geode.internal.cache.eviction.EvictionController;
+import org.apache.geode.internal.cache.persistence.DiskRegionView;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * BaseRegionMap consolidates common behaviour between {@link
AbstractRegionMap} and
+ * {@link ProxyRegionMap}. {@link AbstractRegionMap} should be extended when
implementing
+ * a new {@link RegionMap} and {@link ProxyRegionMap} should be used if an
empty {@link RegionMap}
+ * for regions whose DataPolicy is Proxy is required.
+ */
+abstract class BaseRegionMap implements RegionMap {
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public void incRecentlyUsed() {
+ // nothing
+ }
+
+ @Override
+ public long getEvictions() {
+ return 0;
+ }
+
+ @Override
+ public EvictionController getEvictionController() {
+ return null;
+ }
+
+ @Override
+ public void lruUpdateCallback() {
+ // nothing needed
+ }
+
+ @Override
+ public boolean lruLimitExceeded(DiskRegionView diskRegionView) {
+ return false;
+ }
+
+ @Override
+ public void lruCloseStats() {
+ // nothing needed
+ }
+
+ @Override
+ public void resetThreadLocals() {
+ // By default do nothing; LRU maps needs to override this method
+ }
+
+
+ @Override
+ public boolean disableLruUpdateCallback() {
+ // nothing needed
+ return false;
+ }
+
+ @Override
+ public void enableLruUpdateCallback() {
+ // By default do nothing; LRU maps needs to override this method
+ }
+
+ @Override
+ public int centralizedLruUpdateCallback() {
+ return 0;
+ }
+
+ @Override
+ public void updateEvictionCounter() {}
+
+ @Override
+ public void finishChangeValueForm() {}
+
+ @Override
+ public boolean beginChangeValueForm(EvictableEntry le,
+ CachedDeserializable vmCachedDeserializable, Object v) {
+ return false;
+ }
+
+ @Override
+ public void lruEntryFaultIn(EvictableEntry entry) {
+ // do nothing by default
+ }
+
+ /**
+ * If true then invalidates that throw EntryNotFoundException or that are
already invalid will
+ * first call afterInvalidate on CacheListeners. The old value on the event
passed to
+ * afterInvalidate will be null. If the region is not initialized then
callbacks will not be done.
+ * This property only applies to non-transactional invalidates.
Transactional invalidates ignore
+ * this property. Note that empty "proxy" regions on a client will not be
sent invalidates from
+ * the server unless they also set the proxy InterestPolicy to ALL. If the
invalidate is not sent
+ * then this property will not cause a listener on that client to be
notified of the invalidate. A
+ * non-empty "caching-proxy" will receive invalidates from the server.
+ */
+ @MutableForTesting
+ public static boolean FORCE_INVALIDATE_EVENT =
+ Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX +
"FORCE_INVALIDATE_EVENT");
+
+ /**
+ * If the FORCE_INVALIDATE_EVENT flag is true then invoke callbacks on the
given event.
+ */
+ static void forceInvalidateEvent(EntryEventImpl event, LocalRegion owner) {
+ if (FORCE_INVALIDATE_EVENT) {
+ event.invokeCallbacks(owner, false, false);
+ }
+ }
+
+ static boolean shouldInvokeCallbacks(final LocalRegion owner, final boolean
isInitialized) {
+ LocalRegion lr = owner;
+ boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
+
+ if (isPartitioned) {
+ /*
+ * if(!((BucketRegion)lr).getBucketAdvisor().isPrimary()) {
+ * if(!BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION) { return false; } }
+ */
+ lr = owner.getPartitionedRegion();
+ }
+ return (isPartitioned || isInitialized) &&
(lr.shouldDispatchListenerEvent()
+ || lr.shouldNotifyBridgeClients() || lr.getConcurrencyChecksEnabled());
+ }
+
+ /**
+ * Switch the event's region from BucketRegion to owning PR and set
originRemote to the given
+ * value
+ */
+ static EntryEventImpl switchEventOwnerAndOriginRemote(EntryEventImpl event,
+ boolean originRemote) {
+ assert event != null;
+ if (event.getRegion().isUsedForPartitionedRegionBucket()) {
+ LocalRegion pr = event.getRegion().getPartitionedRegion();
+ event.setRegion(pr);
+ }
+ event.setOriginRemote(originRemote);
+ return event;
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 6dfa6ab..ddc0319 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -85,9 +85,9 @@ import
org.apache.geode.distributed.internal.locks.DLockRemoteToken;
import org.apache.geode.distributed.internal.locks.DLockService;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
+import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook;
import
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.event.DistributedEventTracker;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactory.java
new file mode 100644
index 0000000..881e2f5
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.offheap.annotations.Retained;
+
+/**
+ * Factory for creating instances of {@link EntryEventImpl}
+ */
+public interface EntryEventFactory {
+ @Retained
+ EntryEventImpl createCallbackEvent(InternalRegion internalRegion,
+ Operation op, Object key, Object newValue,
+ TransactionId txId, TXRmtEvent txEvent,
+ EventID eventId, Object aCallbackArgument,
+ FilterRoutingInfo filterRoutingInfo,
+ ClientProxyMembershipID bridgeContext,
+ TXEntryState txEntryState, VersionTag versionTag,
+ long tailKey);
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactoryImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactoryImpl.java
new file mode 100644
index 0000000..cf8dcf1
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventFactoryImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.offheap.annotations.Retained;
+
+/**
+ * Implementation of {@link EntryEventFactory}
+ */
+public class EntryEventFactoryImpl implements EntryEventFactory {
+ private static final Logger logger = LogService.getLogger();
+
+ /** create a callback event for applying a transactional change to the local
cache */
+ @Override
+ @Retained
+ public EntryEventImpl createCallbackEvent(final InternalRegion
internalRegion,
+ Operation op, Object key, Object newValue,
+ TransactionId txId, TXRmtEvent txEvent,
+ EventID eventId, Object aCallbackArgument,
+ FilterRoutingInfo filterRoutingInfo,
+ ClientProxyMembershipID bridgeContext,
+ TXEntryState txEntryState, VersionTag versionTag,
+ long tailKey) {
+ DistributedMember originator = null;
+ // txId should not be null even on localOrigin
+ Assert.assertTrue(txId != null);
+ originator = txId.getMemberId();
+
+ InternalRegion eventRegion = internalRegion;
+ if (eventRegion.isUsedForPartitionedRegionBucket()) {
+ eventRegion = internalRegion.getPartitionedRegion();
+ }
+
+ @Retained
+ EntryEventImpl retVal = EntryEventImpl.create(internalRegion, op, key,
newValue,
+ aCallbackArgument, txEntryState == null, originator);
+ boolean returnedRetVal = false;
+ try {
+
+
+ if (bridgeContext != null) {
+ retVal.setContext(bridgeContext);
+ }
+
+ if (eventRegion.generateEventID()) {
+ retVal.setEventId(eventId);
+ }
+
+ if (versionTag != null) {
+ retVal.setVersionTag(versionTag);
+ }
+
+ retVal.setTailKey(tailKey);
+
+ FilterRoutingInfo.FilterInfo localRouting = null;
+ boolean computeFilterInfo = false;
+ if (filterRoutingInfo == null) {
+ computeFilterInfo = true;
+ } else {
+ localRouting = filterRoutingInfo.getLocalFilterInfo();
+ if (localRouting != null) {
+ // routing was computed in this VM but may need to perform local
interest processing
+ computeFilterInfo =
!filterRoutingInfo.hasLocalInterestBeenComputed();
+ } else {
+ // routing was computed elsewhere and is in the "remote" routing
table
+ localRouting =
filterRoutingInfo.getFilterInfo(internalRegion.getMyId());
+ }
+ if (localRouting != null) {
+ if (!computeFilterInfo) {
+ retVal.setLocalFilterInfo(localRouting);
+ }
+ } else {
+ computeFilterInfo = true;
+ }
+ }
+ if (EntryEventFactoryImpl.logger.isTraceEnabled()) {
+ EntryEventFactoryImpl.logger.trace(
+ "createCBEvent filterRouting={} computeFilterInfo={} local
routing={}",
+ filterRoutingInfo, computeFilterInfo, localRouting);
+ }
+
+ if (internalRegion.isUsedForPartitionedRegionBucket()) {
+ BucketRegion bucket = (BucketRegion) internalRegion;
+ if (BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION
+ || bucket.getBucketAdvisor().isPrimary()) {
+ retVal.setInvokePRCallbacks(true);
+ } else {
+ retVal.setInvokePRCallbacks(false);
+ }
+
+ if (computeFilterInfo) {
+ if (bucket.getBucketAdvisor().isPrimary()) {
+ if (EntryEventFactoryImpl.logger.isTraceEnabled()) {
+ EntryEventFactoryImpl.logger
+ .trace("createCBEvent computing routing for primary bucket");
+ }
+ FilterProfile fp =
+ ((BucketRegion)
internalRegion).getPartitionedRegion().getFilterProfile();
+ if (fp != null) {
+ FilterRoutingInfo fri =
fp.getFilterRoutingInfoPart2(filterRoutingInfo, retVal);
+ if (fri != null) {
+ retVal.setLocalFilterInfo(fri.getLocalFilterInfo());
+ }
+ }
+ }
+ }
+ } else if (computeFilterInfo) { // not a bucket
+ if (EntryEventFactoryImpl.logger.isTraceEnabled()) {
+ EntryEventFactoryImpl.logger.trace("createCBEvent computing routing
for non-bucket");
+ }
+ FilterProfile fp = internalRegion.getFilterProfile();
+ if (fp != null) {
+ retVal.setLocalFilterInfo(fp.getLocalFilterRouting(retVal));
+ }
+ }
+ retVal.setTransactionId(txId);
+ returnedRetVal = true;
+ return retVal;
+ } finally {
+ if (!returnedRetVal) {
+ retVal.release();
+ }
+ }
+ }
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6855aed..91a9328 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -159,12 +159,12 @@ import org.apache.geode.internal.ClassLoadUtil;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.DiskInitFile.DiskRegionFlag;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
import
org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import
org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
index 76f994d..b231d1d 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
@@ -34,13 +34,9 @@ import
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.InternalStatisticsDisabledException;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.InitialImageOperation.Entry;
import org.apache.geode.internal.cache.entries.DiskEntry;
-import org.apache.geode.internal.cache.eviction.EvictableEntry;
-import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.eviction.EvictionList;
-import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionHolder;
@@ -56,7 +52,9 @@ import
org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntrie
*
* @since GemFire 5.0
*/
-class ProxyRegionMap implements RegionMap {
+class ProxyRegionMap extends BaseRegionMap {
+
+ private final EntryEventFactory entryEventFactory = new
EntryEventFactoryImpl();
protected ProxyRegionMap(LocalRegion owner, Attributes attr,
InternalRegionArguments internalRegionArgs) {
@@ -186,7 +184,7 @@ class ProxyRegionMap implements RegionMap {
if (event.getOperation().isLocal()) {
if (this.owner.isInitialized()) {
- AbstractRegionMap.forceInvalidateEvent(event, this.owner);
+ forceInvalidateEvent(event, this.owner);
}
throw new EntryNotFoundException(event.getKey().toString());
}
@@ -272,13 +270,14 @@ class ProxyRegionMap implements RegionMap {
if (event != null) {
event.addDestroy(this.owner, markerEntry, key, aCallbackArgument);
}
- if (AbstractRegionMap.shouldInvokeCallbacks(this.owner, !inTokenMode)) {
+ if (shouldInvokeCallbacks(this.owner, !inTokenMode)) {
// fix for bug 39526
@Released
- EntryEventImpl e = AbstractRegionMap.createCallbackEvent(this.owner,
op, key, null,
- rmtOrigin, event, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext,
- txEntryState, versionTag, tailKey);
- AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState ==
null);
+ EntryEventImpl e =
+ entryEventFactory.createCallbackEvent(this.owner, op, key, null,
+ rmtOrigin, event, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext,
+ txEntryState, versionTag, tailKey);
+ switchEventOwnerAndOriginRemote(e, txEntryState == null);
pendingCallbacks.add(e);
}
}
@@ -295,14 +294,14 @@ class ProxyRegionMap implements RegionMap {
if (event != null) {
event.addInvalidate(this.owner, markerEntry, key, newValue,
aCallbackArgument);
}
- if (AbstractRegionMap.shouldInvokeCallbacks(this.owner,
this.owner.isInitialized())) {
+ if (shouldInvokeCallbacks(this.owner, this.owner.isInitialized())) {
// fix for bug 39526
@Released
- EntryEventImpl e = AbstractRegionMap.createCallbackEvent(this.owner,
+ EntryEventImpl e = entryEventFactory.createCallbackEvent(this.owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key,
newValue, rmtOrigin,
event, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState,
versionTag, tailKey);
- AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState ==
null);
+ switchEventOwnerAndOriginRemote(e, txEntryState == null);
pendingCallbacks.add(e);
}
}
@@ -321,56 +320,25 @@ class ProxyRegionMap implements RegionMap {
if (event != null) {
event.addPut(putOperation, this.owner, markerEntry, key, newValue,
aCallbackArgument);
}
- if (AbstractRegionMap.shouldInvokeCallbacks(this.owner,
this.owner.isInitialized())) {
+ if (shouldInvokeCallbacks(this.owner, this.owner.isInitialized())) {
// fix for bug 39526
@Released
- EntryEventImpl e = AbstractRegionMap.createCallbackEvent(this.owner,
putOperation, key,
- newValue, rmtOrigin, event, eventId, aCallbackArgument,
filterRoutingInfo,
- bridgeContext, txEntryState, versionTag, tailKey);
- AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState ==
null);
+ EntryEventImpl e = entryEventFactory
+ .createCallbackEvent(this.owner, putOperation, key,
+ newValue, rmtOrigin, event, eventId, aCallbackArgument,
filterRoutingInfo,
+ bridgeContext, txEntryState, versionTag, tailKey);
+ switchEventOwnerAndOriginRemote(e, txEntryState == null);
pendingCallbacks.add(e);
}
}
}
- // LRUMapCallbacks methods
- @Override
- public void lruUpdateCallback() {
- // nothing needed
- }
-
- @Override
- public boolean disableLruUpdateCallback() {
- // nothing needed
- return false;
- }
-
- @Override
- public void enableLruUpdateCallback() {
- // nothing needed
- }
-
@Override
public void decTxRefCount(RegionEntry e) {
// nothing needed
}
@Override
- public boolean lruLimitExceeded(DiskRegionView diskRegionView) {
- return false;
- }
-
- @Override
- public void lruCloseStats() {
- // nothing needed
- }
-
- @Override
- public void resetThreadLocals() {
- // nothing needed
- }
-
- @Override
public void removeEntry(Object key, RegionEntry value, boolean updateStats) {
// nothing to do
}
@@ -794,12 +762,6 @@ class ProxyRegionMap implements RegionMap {
}
@Override
- public void lruEntryFaultIn(EvictableEntry entry) {
- // do nothing.
-
- }
-
- @Override
public void copyRecoveredEntries(RegionMap rm) {
throw new IllegalStateException("copyRecoveredEntries should never be
called on proxy");
}
@@ -851,49 +813,15 @@ class ProxyRegionMap implements RegionMap {
}
@Override
- public long getEvictions() {
- return 0;
- }
-
- @Override
- public void incRecentlyUsed() {
- // nothing
- }
-
- @Override
- public EvictionController getEvictionController() {
- return null;
- }
-
- @Override
public int getEntryOverhead() {
return 0;
}
@Override
- public boolean beginChangeValueForm(EvictableEntry le,
- CachedDeserializable vmCachedDeserializable, Object v) {
- return false;
- }
-
- @Override
- public void finishChangeValueForm() {}
-
- @Override
- public int centralizedLruUpdateCallback() {
- return 0;
- }
-
- @Override
- public void updateEvictionCounter() {}
-
- @Override
public ConcurrentMapWithReusableEntries<Object, Object>
getCustomEntryConcurrentHashMap() {
return null;
}
@Override
- public void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object>
map) {
-
- }
+ public void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object>
map) {}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
index 140076e..5e8273c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
@@ -19,13 +19,13 @@ import java.util.List;
import java.util.Set;
import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.TransactionId;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictableMap;
@@ -44,6 +44,26 @@ import
org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntrie
*/
public interface RegionMap extends EvictableMap {
+ interface ARMLockTestHook {
+ void beforeBulkLock(InternalRegion region);
+
+ void afterBulkLock(InternalRegion region);
+
+ void beforeBulkRelease(InternalRegion region);
+
+ void afterBulkRelease(InternalRegion region);
+
+ void beforeLock(InternalRegion region, CacheEvent event);
+
+ void afterLock(InternalRegion region, CacheEvent event);
+
+ void beforeRelease(InternalRegion region, CacheEvent event);
+
+ void afterRelease(InternalRegion region, CacheEvent event);
+
+ void beforeStateFlushWait();
+ }
+
/**
* Parameter object used to facilitate construction of an EntriesMap.
Modification of fields after
* the map is constructed has no effect.
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 2a3f022..b49d1aa 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1074,6 +1074,7 @@ public class TXCommitMessage extends
PooledDistributionMessage
}
public static class RegionCommit {
+ private final EntryEventFactory entryEventFactory = new
EntryEventFactoryImpl();
/**
* The region that this commit represents. Valid on both nearside and
farside.
*/
@@ -1273,7 +1274,8 @@ public class TXCommitMessage extends
PooledDistributionMessage
// No need to release because it is added to pendingCallbacks and they
will be released
// later
EntryEventImpl eei =
- AbstractRegionMap.createCallbackEvent(this.internalRegion,
entryOp.op, entryOp.key,
+ entryEventFactory.createCallbackEvent(this.internalRegion,
entryOp.op,
+ entryOp.key,
entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp),
entryOp.callbackArg,
entryOp.filterRoutingInfo, this.msg.bridgeContext, null,
entryOp.versionTag,
entryOp.tailKey);
@@ -1352,7 +1354,8 @@ public class TXCommitMessage extends
PooledDistributionMessage
*/
@Released
EntryEventImpl eei =
- AbstractRegionMap.createCallbackEvent(this.internalRegion,
entryOp.op, entryOp.key,
+ entryEventFactory.createCallbackEvent(this.internalRegion,
entryOp.op,
+ entryOp.key,
entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp),
entryOp.callbackArg,
entryOp.filterRoutingInfo, this.msg.bridgeContext, null,
entryOp.versionTag,
entryOp.tailKey);