This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 5c37893 GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender (#1924) 5c37893 is described below commit 5c378931c672d695f168d2aca0848664cb4c2f2f Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Fri May 11 18:04:51 2018 -0700 GEODE-5087: send a BatchDestroyOperation for each dropped event at serial primary sender (#1924) --- .../internal/ParallelAsyncEventQueueImpl.java | 2 +- .../internal/SerialAsyncEventQueueImpl.java | 2 +- .../internal/cache/wan/AbstractGatewaySender.java | 68 +++++++++++++--------- .../wan/AbstractGatewaySenderEventProcessor.java | 33 +---------- ...currentParallelGatewaySenderEventProcessor.java | 32 ++++++++++ .../ParallelGatewaySenderEventProcessor.java | 6 ++ .../cache/wan/serial/BatchDestroyOperation.java | 28 ++++++++- ...oncurrentSerialGatewaySenderEventProcessor.java | 64 ++++++++++++-------- .../serial/SerialGatewaySenderEventProcessor.java | 47 +++++++++++++-- .../xmlcache/ParallelAsyncEventQueueCreation.java | 2 +- .../xmlcache/ParallelGatewaySenderCreation.java | 2 +- .../xmlcache/SerialAsyncEventQueueCreation.java | 2 +- .../xmlcache/SerialGatewaySenderCreation.java | 2 +- .../wan/parallel/ParallelGatewaySenderImpl.java | 2 +- .../cache/wan/serial/SerialGatewaySenderImpl.java | 6 +- .../SerialGatewaySenderOperationsDUnitTest.java | 2 - 16 files changed, 199 insertions(+), 101 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java index 538b65a..8e2e4e4 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java @@ -168,7 +168,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender { } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { + public void setModifiedEventId(EntryEventImpl clonedEvent) { int bucketId = -1; // merged from 42004 if (clonedEvent.getRegion() instanceof DistributedRegion) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java index 9e0239d..400126d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java @@ -225,7 +225,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender { * internal.cache.EntryEventImpl) */ @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { + public void setModifiedEventId(EntryEventImpl clonedEvent) { EventID originalEventId = clonedEvent.getEventId(); long originalThreadId = originalEventId.getThreadID(); long newThreadId = originalThreadId; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 123534a..149fa48 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -173,6 +173,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents = new ConcurrentLinkedQueue<>(); + + protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents = + new ConcurrentLinkedQueue<>(); /** * The number of seconds to wait before stopping the GatewaySender. Default is 0 seconds. */ @@ -836,40 +839,43 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi final boolean isDebugEnabled = logger.isDebugEnabled(); - // If this gateway is not running, return - if (!isRunning()) { - if (isDebugEnabled) { - logger.debug("Returning back without putting into the gateway sender queue:" + event); - } - if (this.eventProcessor != null) { - this.eventProcessor.registerEventDroppedInPrimaryQueue(event); - } - return; - } - - final GatewaySenderStats stats = getStatistics(); - stats.incEventsReceived(); - - if (!checkForDistribution(event, stats)) { - stats.incEventsNotQueued(); - return; - } - - // this filter is defined by Asif which exist in old wan too. new wan has - // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is - // not considering this filter - if (!this.filter.enqueueEvent(event)) { - stats.incEventsFiltered(); - return; - } // released by this method or transfers ownership to TmpQueueEvent @Released EntryEventImpl clonedEvent = new EntryEventImpl(event, false); boolean freeClonedEvent = true; try { - Region region = event.getRegion(); + // If this gateway is not running, return + if (!isRunning()) { + if (this.isPrimary()) { + tmpDroppedEvents.add(clonedEvent); + if (isDebugEnabled) { + logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent); + } + } + if (isDebugEnabled) { + logger.debug("Returning back without putting into the gateway sender queue:" + event); + } + return; + } + + final GatewaySenderStats stats = getStatistics(); + stats.incEventsReceived(); + + if (!checkForDistribution(event, stats)) { + stats.incEventsNotQueued(); + return; + } + // this filter is defined by Asif which exist in old wan too. new wan has + // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is + // not considering this filter + if (!this.filter.enqueueEvent(event)) { + stats.incEventsFiltered(); + return; + } + + // start to distribute setModifiedEventId(clonedEvent); Object callbackArg = clonedEvent.getRawCallbackArgument(); @@ -1016,6 +1022,12 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi */ public void enqueueTempEvents() { if (this.eventProcessor != null) {// Fix for defect #47308 + // process tmpDroppedEvents + EntryEventImpl droppedEvent = null; + while ((droppedEvent = tmpDroppedEvents.poll()) != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent); + } + TmpQueueEvent nextEvent = null; final GatewaySenderStats stats = getStatistics(); try { @@ -1216,7 +1228,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return region; } - protected abstract void setModifiedEventId(EntryEventImpl clonedEvent); + public abstract void setModifiedEventId(EntryEventImpl clonedEvent); public static class DefaultGatewayEventFilter implements org.apache.geode.internal.cache.GatewayEventFilter { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 2ce06c6..89fa586 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -33,7 +33,6 @@ import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; @@ -279,36 +277,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { return this.queue.size(); } - public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) { - if (queue == null) { - return; - } - if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { - ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; - PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath()); - if (prQ == null) { - if (logger.isDebugEnabled()) { - logger.debug("shadow partitioned region " + event.getRegion().getFullPath() - + " is not created yet."); - } - return; - } - int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event); - long shadowKey = event.getTailKey(); - - ParallelGatewaySenderQueue pgsq = - (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); - boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); - if (isPrimary) { - pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey); - this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning(); - if (logger.isDebugEnabled()) { - logger.debug("register dropped event for primary queue. BucketId is " + bucketId - + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); - } - } - } - } + protected abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent); /** * @return the sender diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index 54b7034..6b8cce1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -32,12 +32,15 @@ import org.apache.geode.GemFireException; import org.apache.geode.InternalGemFireException; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Region; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.InternalRegion; import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; @@ -138,6 +141,35 @@ public class ConcurrentParallelGatewaySenderEventProcessor } @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + if (queue == null) { + return; + } + ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; + PartitionedRegion prQ = cpgsq.getRegion(droppedEvent.getRegion().getFullPath()); + if (prQ == null) { + if (logger.isDebugEnabled()) { + logger.debug("shadow partitioned region " + droppedEvent.getRegion().getFullPath() + + " is not created yet."); + } + return; + } + int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) droppedEvent); + long shadowKey = droppedEvent.getTailKey(); + + ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); + boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); + if (isPrimary) { + pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey); + this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning(); + if (logger.isDebugEnabled()) { + logger.debug("register dropped event for primary queue. BucketId is " + bucketId + + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath()); + } + } + } + + @Override public void run() { final boolean isDebugEnabled = logger.isDebugEnabled(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index 5715a35..77811c8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -149,6 +149,12 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv } } + @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + logger.info("ParallelGatewaySenderEventProcessor should not process dropped event {}", + droppedEvent); + } + public void clear(PartitionedRegion pr, int bucketId) { ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java index debb005..d9dde9d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java @@ -102,7 +102,7 @@ public class BatchDestroyOperation extends DistributedCacheOperation { } // Optimized way - for (long k = (Long) this.key; k <= this.tailKey; k++) { + for (long k = (Long) this.key; k <= this.tailKey && this.tailKey != -1; k++) { try { for (GatewayEventFilter filter : rgn.getSerialGatewaySender() .getGatewayEventFilters()) { @@ -124,6 +124,32 @@ public class BatchDestroyOperation extends DistributedCacheOperation { } } } + + // destroy dropped event from unprocessedKeys + if (this.tailKey == -1) { + SerialGatewaySenderEventProcessor ep = null; + int index = ((Long) this.key).intValue(); + if (index == -1) { + // this is SerialGatewaySenderEventProcessor + ep = (SerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender() + .getEventProcessor(); + } else { + ConcurrentSerialGatewaySenderEventProcessor csgep = + (ConcurrentSerialGatewaySenderEventProcessor) rgn.getSerialGatewaySender() + .getEventProcessor(); + ep = csgep.processors.get(index); + } + if (ep != null) { + // if sender is being shutdown, the ep could be null + boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId()); + if (removed) { + if (isDebugEnabled) { + logger.debug("Removed a dropped event {} from unprocessedEvents.", + (EntryEventImpl) event); + } + } + } + } this.appliedOperation = true; } catch (CacheWriterException e) { throw new Error( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index ec01fd9..8ec6ce1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -109,6 +109,35 @@ public class ConcurrentSerialGatewaySenderEventProcessor } + public void setModifiedEventId(EntryEventImpl clonedEvent, int index) { + EventID originalEventId = clonedEvent.getEventId(); + if (logger.isDebugEnabled()) { + logger.debug("The original EventId is {}", originalEventId); + } + // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID(); + // generating threadId by the algorithm explained above used to clash with + // fakeThreadId generated by putAll + // below is new way to generate threadId so that it doesn't clash with + // any. + long newThreadId = + ThreadIdentifier.createFakeThreadIDForParallelGateway(index, originalEventId.getThreadID(), + 0 /* + * gateway sender event id index has already been applied in + * SerialGatewaySenderImpl.setModifiedEventId + */); + EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId, + originalEventId.getSequenceID()); + if (logger.isDebugEnabled()) { + logger.debug( + "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}" + + ":index=" + this.sender.getEventIdIndex(), + this, clonedEvent.getKey(), index, originalEventId, + ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), newEventId, + ThreadIdentifier.toDisplayString(newThreadId)); + } + clonedEvent.setEventId(newEventId); + } + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, int index) throws CacheException, IOException { // Get the appropriate gateway @@ -121,30 +150,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor @Released EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event); try { - EventID originalEventId = clonedEvent.getEventId(); - if (logger.isDebugEnabled()) { - logger.debug("The original EventId is {}", originalEventId); - } - // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID(); - // generating threadId by the algorithm explained above used to clash with - // fakeThreadId generated by putAll - // below is new way to generate threadId so that it doesn't clash with - // any. - long newThreadId = ThreadIdentifier.createFakeThreadIDForParallelGateway(index, - originalEventId.getThreadID(), - 0 /* - * gateway sender event id index has already been applied in - * SerialGatewaySenderImpl.setModifiedEventId - */); - EventID newEventId = new EventID(originalEventId.getMembershipID(), newThreadId, - originalEventId.getSequenceID()); - if (logger.isDebugEnabled()) { - logger.debug( - "{}: Generated event id for event with key={}, index={}, original event id={}, threadId={}, new event id={}, newThreadId={}", - this, event.getKey(), index, originalEventId, originalEventId.getThreadID(), - newEventId, newThreadId); - } - clonedEvent.setEventId(newEventId); + setModifiedEventId(clonedEvent, index); serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue); } finally { clonedEvent.release(); @@ -375,6 +381,16 @@ public class ConcurrentSerialGatewaySenderEventProcessor } @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + this.getSender().setModifiedEventId(droppedEvent); + // modified event again for concurrent SGSEP + int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % this.processors.size()); + setModifiedEventId(droppedEvent, index); + + this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent, index); + } + + @Override protected void enqueueEvent(GatewayQueueEvent event) { for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) { serialProcessor.enqueueEvent(event); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 3fa4d6a..39609c7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper; import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; @@ -610,7 +611,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven } my_executor.execute(new Runnable() { public void run() { - basicHandlePrimaryDestroy(gatewayEvent); + basicHandlePrimaryDestroy(gatewayEvent.getEventId()); } }); } @@ -620,23 +621,25 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven * Just remove the event from the unprocessed events map if it is present. This method added to * fix bug 37603 */ - protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl gatewayEvent) { + protected boolean basicHandlePrimaryDestroy(final EventID eventId) { if (this.sender.isPrimary()) { // no need to do anything if we have become the primary - return; + return false; } GatewaySenderStats statistics = this.sender.getStatistics(); // Get the event from the map synchronized (unprocessedEventsLock) { if (this.unprocessedEvents == null) - return; + return false; // now we can safely use the unprocessedEvents field - EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId()); + EventWrapper ew = this.unprocessedEvents.remove(eventId); if (ew != null) { ew.event.release(); statistics.incUnprocessedEventsRemovedByPrimary(); + return true; } } + return false; } protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl gatewayEvent) { @@ -865,4 +868,38 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // @TODO This API hasn't been implemented yet throw new UnsupportedOperationException(); } + + public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl dropEvent, int index) { + EntryEventImpl destroyEvent = + EntryEventImpl.create((LocalRegion) this.queue.getRegion(), Operation.DESTROY, (long) index, + null/* newValue */, null, false, sender.getCache().getMyId()); + destroyEvent.setEventId(dropEvent.getEventId()); + destroyEvent.disallowOffHeapValues(); + destroyEvent.setTailKey(-1L); + if (logger.isDebugEnabled()) { + logger.debug( + "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to secondary for event {}", + destroyEvent); + } + + try { + BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent); + op.distribute(); + if (logger.isDebugEnabled()) { + logger.debug("BatchRemovalThread completed destroy of dropped event {}", dropEvent); + } + } catch (Exception ignore) { + if (logger.isDebugEnabled()) { + logger.debug( + "Exception in sending dropped event could be ignored in order not to interrupt sender starting", + ignore); + } + } + } + + @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + this.getSender().setModifiedEventId(droppedEvent); + sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java index 6f8efa8..4686b67 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java @@ -87,5 +87,5 @@ public class ParallelAsyncEventQueueCreation extends AbstractGatewaySender } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) {} + public void setModifiedEventId(EntryEventImpl clonedEvent) {} } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java index 5b025b5..257ee75 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java @@ -90,7 +90,7 @@ public class ParallelGatewaySenderCreation extends AbstractGatewaySender impleme } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) {} + public void setModifiedEventId(EntryEventImpl clonedEvent) {} protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) { throw new UnsupportedOperationException(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java index ce71c54..cd06661 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java @@ -86,5 +86,5 @@ public class SerialAsyncEventQueueCreation extends AbstractGatewaySender impleme } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) {} + public void setModifiedEventId(EntryEventImpl clonedEvent) {} } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java index 80c04de..b0766ff 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java @@ -87,7 +87,7 @@ public class SerialGatewaySenderCreation extends AbstractGatewaySender implement } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) {} + public void setModifiedEventId(EntryEventImpl clonedEvent) {} protected GatewayQueueEvent getSynchronizationEvent(Object key, long timestamp) { throw new UnsupportedOperationException(); diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java index d023704..f565426 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java @@ -167,7 +167,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { + public void setModifiedEventId(EntryEventImpl clonedEvent) { int bucketId = -1; // merged from 42004 if (clonedEvent.getRegion() instanceof DistributedRegion) { diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java index d964253..ecca896 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -211,7 +211,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { } @Override - protected void setModifiedEventId(EntryEventImpl clonedEvent) { + public void setModifiedEventId(EntryEventImpl clonedEvent) { EventID originalEventId = clonedEvent.getEventId(); long originalThreadId = originalEventId.getThreadID(); long newThreadId = originalThreadId; @@ -226,7 +226,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { if (logger.isDebugEnabled()) { logger.debug( "{}: Generated event id for event with key={}, original event id={}, originalThreadId={}, new event id={}, newThreadId={}", - this, clonedEvent.getKey(), originalEventId, originalThreadId, newEventId, newThreadId); + this, clonedEvent.getKey(), originalEventId, + ThreadIdentifier.toDisplayString(originalThreadId), newEventId, + ThreadIdentifier.toDisplayString(newThreadId)); } clonedEvent.setEventId(newEventId); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index caa357e..4993f24 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; import org.apache.geode.test.junit.categories.WanTest; @Category({DistributedTest.class, WanTest.class}) @@ -266,7 +265,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0)); } - @Category({FlakyTest.class, WanTest.class}) // GEODE-5056 @Test public void testRestartSerialGatewaySendersWhilePutting() throws Throwable { Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); -- To stop receiving notification emails like this one, please contact zho...@apache.org.