GEODE-478: GatewaySender now handles MessageTooLargeExceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a904f147 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a904f147 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a904f147 Branch: refs/heads/feature/GEODE-949-2 Commit: a904f1474ec3153bc39f650d49731214f25c6230 Parents: 4d0dfc5 Author: Barry Oglesby <[email protected]> Authored: Tue Mar 8 15:55:34 2016 -0800 Committer: Barry Oglesby <[email protected]> Committed: Wed Mar 16 09:56:41 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/tier/sockets/Message.java | 2 +- .../AbstractGatewaySenderEventProcessor.java | 33 +++++++-- .../parallel/ParallelGatewaySenderQueue.java | 74 ++++++++++++++++---- .../gemfire/internal/i18n/LocalizedStrings.java | 6 +- .../wan/GatewaySenderEventRemoteDispatcher.java | 31 +++++++- .../gemfire/internal/cache/wan/WANTestBase.java | 26 +++++-- ...arallelGatewaySenderOperationsDUnitTest.java | 35 +++++++++ 7 files changed, 177 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java index a6495e2..44c88c1 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java @@ -564,7 +564,7 @@ public class Message { msgLen = (int)(headerLen + totalPartLen); if (msgLen > MAX_MESSAGE_SIZE) { - throw new MessageTooLargeException("Message size(" + msgLen + throw new MessageTooLargeException("Message size (" + msgLen + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 86ecce1..51b125a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -38,12 +38,9 @@ import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.client.internal.Connection; -import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; import com.gemstone.gemfire.cache.wan.GatewayEventFilter; import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.Conflatable; import com.gemstone.gemfire.internal.cache.DistributedRegion; @@ -143,6 +140,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { private volatile boolean resetLastPeekedEvents; private long numEventsDispatched; + + /** + * The batchSize is the batch size being used by this processor. By default, it is the + * configured batch size of the GatewaySender. It may be automatically reduced if a + * MessageTooLargeException occurs. + */ + private int batchSize; /** * @param createThreadGroup @@ -152,6 +156,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { String string, GatewaySender sender) { super(createThreadGroup, string); this.sender = (AbstractGatewaySender)sender; + this.batchSize = sender.getBatchSize(); } abstract protected void initializeMessageQueue(String id); @@ -214,6 +219,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { this.resetLastPeekedEvents = true; } + protected int getBatchSize() { + return this.batchSize; + } + + protected void setBatchSize(int batchSize) { + int currentBatchSize = this.batchSize; + if (batchSize <= 0) { + this.batchSize = 1; + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED, new Object[] { currentBatchSize, batchSize })); + } else { + this.batchSize = batchSize; + logger.info(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE, new Object[] { currentBatchSize, this.batchSize })); + } + } + /** * Returns the current batch id to be used to identify the next batch. * @@ -387,7 +409,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { final boolean isDebugEnabled = logger.isDebugEnabled(); final boolean isTraceEnabled = logger.isTraceEnabled(); - final int batchSize = sender.getBatchSize(); final int batchTimeInterval = sender.getBatchTimeInterval(); final GatewaySenderStats statistics = this.sender.getStatistics(); @@ -417,7 +438,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // Peek a batch if (isDebugEnabled) { - logger.debug("Attempting to peek a batch of {} events", batchSize); + logger.debug("Attempting to peek a batch of {} events", this.batchSize); } for (;;) { // check before sleeping @@ -481,7 +502,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } }*/ } - events = this.queue.peek(batchSize, batchTimeInterval); + events = this.queue.peek(this.batchSize, batchTimeInterval); } catch (InterruptedException e) { interrupted = true; this.sender.getCancelCriterion().checkCancelInProgress(e); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index c00903f..a9d0f3e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -54,7 +54,6 @@ import com.gemstone.gemfire.cache.EvictionAttributes; import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; @@ -143,7 +142,17 @@ public class ParallelGatewaySenderQueue implements RegionQueue { private static BatchRemovalThread removalThread = null; protected BlockingQueue<GatewaySenderEventImpl> peekedEvents = new LinkedBlockingQueue<GatewaySenderEventImpl>(); - + + /** + * The peekedEventsProcessing queue is used when the batch size is reduced due to a MessageTooLargeException + */ + private BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing = new LinkedBlockingQueue<GatewaySenderEventImpl>(); + + /** + * The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is in progress + */ + private boolean peekedEventsProcessingInProgress = false; + public final AbstractGatewaySender sender ; public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10; @@ -1147,6 +1156,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public void resetLastPeeked() { this.resetLastPeeked = true; + + // Reset the in progress boolean and queue for peeked events in progress + this.peekedEventsProcessingInProgress = false; + this.peekedEventsProcessing.clear(); } // Need to improve here.If first peek returns NULL then look in another bucket. @@ -1283,19 +1296,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { long start = System.currentTimeMillis(); long end = start + timeToWait; - if (this.resetLastPeeked) { - batch.addAll(peekedEvents); - this.resetLastPeeked = false; - if (isDebugEnabled) { - StringBuffer buffer = new StringBuffer(); - for (GatewaySenderEventImpl ge : peekedEvents) { - buffer.append("event :"); - buffer.append(ge); - } - logger.debug("Adding already peeked events to the batch {}", buffer); - } - } - + // Add peeked events + addPeekedEvents(batch, batchSize); + int bId = -1; while (batch.size() < batchSize) { if (areLocalBucketQueueRegionsPresent() @@ -1372,6 +1375,47 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return batch; } + private void addPeekedEvents(List batch, int batchSize) { + if (this.resetLastPeeked) { + if (this.peekedEventsProcessingInProgress) { + // Peeked event processing is in progress. This means that the original peekedEvents + // contained > batch size events due to a reduction in the batch size. Create a batch + // from the peekedEventsProcessing queue. + addPreviouslyPeekedEvents(batch, batchSize); + } else if (peekedEvents.size() <= batchSize) { + // This is the normal case. The connection was lost while processing a batch. + // This recreates the batch from the current peekedEvents. + batch.addAll(peekedEvents); + this.resetLastPeeked = false; + } else { + // The peekedEvents queue is > batch size. This means that the previous batch size was + // reduced due to MessageTooLargeException. Create a batch from the peekedEventsProcessing queue. + this.peekedEventsProcessing.addAll(this.peekedEvents); + this.peekedEventsProcessingInProgress = true; + addPreviouslyPeekedEvents(batch, batchSize); + } + if (logger.isDebugEnabled()) { + StringBuffer buffer = new StringBuffer(); + for (Object ge : batch) { + buffer.append("event :"); + buffer.append(ge); + } + logger.debug("Adding already peeked events to the batch {}", buffer); + } + } + } + + private void addPreviouslyPeekedEvents(List batch, int batchSize) { + for (int i=0; i<batchSize; i++) { + batch.add(this.peekedEventsProcessing.remove()); + if (this.peekedEventsProcessing.isEmpty()) { + this.resetLastPeeked = false; + this.peekedEventsProcessingInProgress = false; + break; + } + } + } + protected void blockProcesorThreadIfRequired() throws InterruptedException { queueEmptyLock.lock(); try { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java index 8147718..3996692 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java @@ -2145,7 +2145,11 @@ public class LocalizedStrings extends ParentLocalizedStrings { public static final StringId AUTH_FAILED_TO_ACQUIRE_AUTHINITIALIZE_INSTANCE = new StringId(6611, "AuthInitialize instance could not be obtained"); public static final StringId AUTH_FAILED_TO_OBTAIN_CREDENTIALS_IN_0_USING_AUTHINITIALIZE_1_2 = new StringId(6612, "Failed to obtain credentials using AuthInitialize [{1}]. {2}"); public static final StringId DistributedSystem_BACKUP_ALREADY_IN_PROGRESS = new StringId(6613, "A backup is already in progress."); - + + public static final StringId AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE = new StringId(6614, "Set the batch size from {0} to {1} events"); + public static final StringId AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED = new StringId(6615, "Attempting to set the batch size from {0} to {1} events failed. Instead it was set to 1."); + public static final StringId GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION = new StringId(6616, "The following exception occurred attempting to send a batch of {0} events. The batch will be tried again after reducing the batch size to {1} events."); + /** Testing strings, messageId 90000-99999 **/ /** These are simple messages for testing, translated with Babelfish. **/ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 9da6748..22dff3d 100644 --- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.CancelException; @@ -31,7 +33,6 @@ import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.client.ServerConnectivityException; import com.gemstone.gemfire.cache.client.ServerOperationException; import com.gemstone.gemfire.cache.client.internal.Connection; -import com.gemstone.gemfire.cache.client.internal.ServerProxy; import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; import com.gemstone.gemfire.cache.wan.GatewaySender; import com.gemstone.gemfire.distributed.internal.ServerLocation; @@ -151,7 +152,9 @@ public class GatewaySenderEventRemoteDispatcher implements try { long start = statistics.startTime(); success =_dispatchBatch(events, isRetry); - statistics.endBatch(start, events.size()); + if (success) { + statistics.endBatch(start, events.size()); + } } catch (GatewaySenderException ge) { Throwable t = ge.getCause(); @@ -159,7 +162,8 @@ public class GatewaySenderEventRemoteDispatcher implements // if our pool is shutdown then just be silent } else if (t instanceof IOException || t instanceof ServerConnectivityException - || t instanceof ConnectionDestroyedException) { + || t instanceof ConnectionDestroyedException + || t instanceof MessageTooLargeException) { this.processor.handleException(); // If the cause is an IOException or a ServerException, sleep and retry. // Sleep for a bit and recheck. @@ -243,6 +247,27 @@ public class GatewaySenderEventRemoteDispatcher implements LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); } + catch (GemFireIOException e) { + Throwable t = e.getCause(); + if (t instanceof MessageTooLargeException) { + // A MessageTooLargeException has occurred. + // Do not process the connection as dead since it is not dead. + ex = (MessageTooLargeException)t; + // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less) + int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2; + logger.warn(LocalizedMessage.create( + LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e); + this.processor.setBatchSize(newBatchSize); + } + else { + ex = e; + // keep using the connection if we had a MessageTooLargeException. Else, destroy it + destroyConnection(); + } + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); + } catch (Exception e) { // An Exception has occurred. Get its cause. Throwable t = e.getCause(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index 0a1a7ef..5da6b5c 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -2257,7 +2257,7 @@ public class WANTestBase extends DistributedTestCase{ public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManulaStart) { + GatewayEventFilter filter, boolean isManualStart) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); try { File persistentDirectory = new File(dsName + "_disk_" @@ -2270,7 +2270,7 @@ public class WANTestBase extends DistributedTestCase{ gateway.setParallel(true); gateway.setMaximumQueueMemory(maxMemory); gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); + gateway.setManualStart(isManualStart); //set dispatcher threads gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); ((InternalGatewaySenderFactory) gateway) @@ -2294,7 +2294,7 @@ public class WANTestBase extends DistributedTestCase{ GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); gateway.setMaximumQueueMemory(maxMemory); gateway.setBatchSize(batchSize); - gateway.setManualStart(isManulaStart); + gateway.setManualStart(isManualStart); //set dispatcher threads gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); ((InternalGatewaySenderFactory) gateway) @@ -3056,7 +3056,25 @@ public class WANTestBase extends DistributedTestCase{ // r.destroy(i); // } } - + + + public static void doPuts(String regionName, int numPuts, Object value) { + IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class + .getName()); + IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class + .getName()); + try { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + r.put(i, value); + } + } finally { + exp1.remove(); + exp2.remove(); + } + } + public static void doPuts(String regionName, int numPuts) { IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class .getName()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index 9e1b28c..f929d89 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -16,8 +16,10 @@ */ package com.gemstone.gemfire.internal.cache.wan.parallel; +import com.gemstone.gemfire.GemFireIOException; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; import com.gemstone.gemfire.internal.cache.wan.WANTestBase; @@ -555,6 +557,39 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); } + public void testParallelGatewaySenderMessageTooLargeException() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + // Create and start sender with reduced maximum message size and 1 dispatcher thread + String regionName = getTestMethodName() + "_PR"; + vm4.invoke(() -> setMaximumMessageSize( 1024*1024 )); + vm4.invoke(() -> createCache( lnPort )); + vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 )); + vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false )); + vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() )); + + // Do puts + int numPuts = 200; + vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] )); + validateRegionSizes(regionName, numPuts, vm4); + + // Start receiver + IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4); + IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4); + vm2.invoke(() -> createReceiver( nyPort )); + vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() )); + validateRegionSizes( regionName, numPuts, vm2 ); + ignoredMTLE.remove(); + ignoredGIOE.remove(); + } + + private void setMaximumMessageSize(int maximumMessageSizeBytes) { + System.setProperty("gemfire.client.max-message-size", String.valueOf(maximumMessageSizeBytes)); + LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: " + System.getProperty("gemfire.client.max-message-size")); + } + private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors, boolean startSenders) { // Note: This is a test-specific method used by several test to create
