GEODE-1067: The dispatcher now handles IllegalStateException by retrying batch without the released event
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ff69aeae Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ff69aeae Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ff69aeae Branch: refs/heads/feature/GEODE-1050 Commit: ff69aeae9615d7263075079e8e5f5180bc350dce Parents: 5b6a2cd Author: Barry Oglesby <[email protected]> Authored: Wed Mar 16 12:27:39 2016 -0700 Committer: Barry Oglesby <[email protected]> Committed: Thu Mar 17 10:11:47 2016 -0700 ---------------------------------------------------------------------- .../wan/AbstractGatewaySenderEventProcessor.java | 15 +++++++++++++++ .../internal/cache/wan/GatewaySenderEventImpl.java | 7 +++++++ .../wan/GatewaySenderEventRemoteDispatcher.java | 15 +++++++++++---- 3 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff69aeae/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 51b125a..5020cf2 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -520,6 +520,21 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { filteredList = new ArrayList<GatewaySenderEventImpl>(); filteredList.addAll(events); + + // If the exception has been set and its cause is an IllegalStateExcetption, + // remove all events whose serialized value is no longer available + if (this.exception != null && this.exception.getCause() != null + && this.exception.getCause() instanceof IllegalStateException) { + for (Iterator<GatewaySenderEventImpl> i = filteredList.iterator(); i.hasNext();) { + GatewaySenderEventImpl event = i.next(); + if (event.isSerializedValueNotAvailable()) { + i.remove(); + } + } + this.exception = null; + } + + // Filter the events for (GatewayEventFilter filter : sender.getGatewayEventFilters()) { Iterator<GatewaySenderEventImpl> itr = filteredList.iterator(); while (itr.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff69aeae/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java index d8922f8..6f284b5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventImpl.java @@ -131,6 +131,8 @@ public class GatewaySenderEventImpl implements protected transient Object valueObj; protected transient boolean valueObjReleased; + private transient boolean serializedValueNotAvailable; + /** * Whether the value is a serialized object or just a byte[] */ @@ -662,6 +664,10 @@ public class GatewaySenderEventImpl implements } } + public boolean isSerializedValueNotAvailable() { + return this.serializedValueNotAvailable; + } + /** * If the value owned of this event is just bytes return that byte array; * otherwise serialize the value object and return the serialized bytes. @@ -698,6 +704,7 @@ public class GatewaySenderEventImpl implements this.value = result; } else if (result == null) { if (this.valueObjReleased) { + this.serializedValueNotAvailable = true; throw new IllegalStateException("Value is no longer available. getSerializedValue must be called before processEvents returns."); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff69aeae/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 22dff3d..ad2be2b 100644 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -163,7 +163,8 @@ public class GatewaySenderEventRemoteDispatcher implements } else if (t instanceof IOException || t instanceof ServerConnectivityException || t instanceof ConnectionDestroyedException - || t instanceof MessageTooLargeException) { + || t instanceof MessageTooLargeException + || t instanceof IllegalStateException) { this.processor.handleException(); // If the cause is an IOException or a ServerException, sleep and retry. // Sleep for a bit and recheck. @@ -268,6 +269,12 @@ public class GatewaySenderEventRemoteDispatcher implements LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); } + catch (IllegalStateException e) { + this.processor.setException(new GatewaySenderException(e)); + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), e); + } catch (Exception e) { // An Exception has occurred. Get its cause. Throwable t = e.getCause(); @@ -321,7 +328,7 @@ public class GatewaySenderEventRemoteDispatcher implements if (cache != null && !cache.isClosed()) { if (this.sender.isPrimary() && (this.connection != null)) { if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) { - this.ackReaderThread = new AckReaderThread(this.sender); + this.ackReaderThread = new AckReaderThread(this.sender, this.processor); this.ackReaderThread.start(); this.ackReaderThread.waitForRunningAckReaderThreadRunningState(); } @@ -541,8 +548,8 @@ public class GatewaySenderEventRemoteDispatcher implements private volatile boolean ackReaderThreadRunning = false; - public AckReaderThread(GatewaySender sender) { - super("AckReaderThread for : " + sender.getId()); + public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) { + super("AckReaderThread for : " + processor.getName()); this.setDaemon(true); this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache(); }
