This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new fef9a4f GEODE-9961: GatewayReceiver rethrows CancelException (#7275)
fef9a4f is described below
commit fef9a4ff58182faaaa50bfd7b009ed31101f62d4
Author: Mario Ivanac <[email protected]>
AuthorDate: Wed Jan 19 13:46:23 2022 +0100
GEODE-9961: GatewayReceiver rethrows CancelException (#7275)
---
.../sockets/command/GatewayReceiverCommand.java | 4 +
.../command/GatewayReceiverCommandTest.java | 168 +++++++++++++++++++++
2 files changed, 172 insertions(+)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
index 5df0ebb..5ebd08f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java
@@ -715,6 +715,10 @@ public class GatewayReceiverCommand extends BaseCommand {
private void handleException(boolean removeOnException, GatewayReceiverStats
stats, Exception e)
throws Exception {
+ if (e instanceof CancelException) {
+ throw e;
+ }
+
if (shouldThrowException(removeOnException)) {
throw e;
} else {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java
new file mode 100644
index 0000000..e1272b0
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class GatewayReceiverCommandTest {
+
+ private static final String REGION_NAME = "region1";
+ private static final String KEY = "key1";
+ private static final Object VALUE = "value1";
+
+ private static final byte[] REMOVE_ON_EXCEPTION_BYTES = new byte[] {0};
+ private static final byte[] POSSIBLE_DUPLICATE_BYTES = new byte[] {1};
+ private static final byte[] CALLBACK_ARG_EXIST_BYTES = new byte[] {0};
+
+ @Mock
+ private EventID eventId;
+
+ @Mock
+ private Message message;
+ @Mock
+ private SecurityService securityService;
+ @Mock
+ private ServerConnection serverConnection;
+ @Mock
+ private CachedRegionHelper cachedRegionHelper;
+ @Mock
+ private GatewayReceiverStats gatewayReceiverStats;
+ @Mock
+ private InternalCache cache;
+
+
+ @Mock
+ private Part numberOfEventsPart;
+ @Mock
+ private Part batchIdPart;
+ @Mock
+ private Part dsidPart;
+ @Mock
+ private Part removeOnExceptionPart;
+ @Mock
+ private Part actionTypePart;
+ @Mock
+ private Part possibleDuplicatePart;
+ @Mock
+ private Part regionNamePart;
+ @Mock
+ private Part eventIdPart;
+ @Mock
+ private Part keyPart;
+ @Mock
+ private Part valuePart;
+ @Mock
+ private Part callbackArgExistsPart;
+ @Mock
+ private Part versionTimeStampPart;
+
+ private GatewayReceiverCommand gatewayReceiverCommand;
+
+ @Before
+ public void setUp() throws Exception {
+ gatewayReceiverCommand = (GatewayReceiverCommand)
GatewayReceiverCommand.getCommand();
+ MockitoAnnotations.openMocks(this);
+
+
when(serverConnection.getCachedRegionHelper()).thenReturn(cachedRegionHelper);
+
when(serverConnection.getCacheServerStats()).thenReturn(gatewayReceiverStats);
+ when(serverConnection.getLatestBatchIdReplied()).thenReturn(0);
+
+ when(cachedRegionHelper.getCacheForGatewayCommand()).thenReturn(cache);
+
+ when(numberOfEventsPart.getInt()).thenReturn(1);
+ when(batchIdPart.getInt()).thenReturn(1);
+ when(dsidPart.getInt()).thenReturn(1);
+
when(removeOnExceptionPart.getSerializedForm()).thenReturn(REMOVE_ON_EXCEPTION_BYTES);
+
+
when(possibleDuplicatePart.getObject()).thenReturn(POSSIBLE_DUPLICATE_BYTES);
+ when(regionNamePart.getCachedString()).thenReturn(REGION_NAME);
+ when(eventIdPart.getObject()).thenReturn(eventId);
+ when(keyPart.getStringOrObject()).thenReturn(KEY);
+ when(valuePart.getStringOrObject()).thenReturn(VALUE);
+
when(callbackArgExistsPart.getObject()).thenReturn(CALLBACK_ARG_EXIST_BYTES);
+ when(versionTimeStampPart.getLong()).thenReturn(1l);
+
+ when(message.getNumberOfParts()).thenReturn(12);
+ when(message.getPart(eq(0))).thenReturn(numberOfEventsPart);
+ when(message.getPart(eq(1))).thenReturn(batchIdPart);
+ when(message.getPart(eq(2))).thenReturn(dsidPart);
+ when(message.getPart(eq(3))).thenReturn(removeOnExceptionPart);
+ when(message.getPart(eq(4))).thenReturn(actionTypePart);
+
+ when(message.getPart(eq(5))).thenReturn(possibleDuplicatePart);
+ when(message.getPart(eq(6))).thenReturn(regionNamePart);
+ when(message.getPart(eq(7))).thenReturn(eventIdPart);
+ when(message.getPart(eq(8))).thenReturn(keyPart);
+ when(message.getPart(eq(9))).thenReturn(valuePart);
+ when(message.getPart(eq(10))).thenReturn(callbackArgExistsPart);
+ when(message.getPart(eq(11))).thenReturn(versionTimeStampPart);
+
+ }
+
+ @Test
+ public void cacheClosedAtCreateEvent() throws Exception {
+ when(cache.getRegion(any())).thenThrow(CacheClosedException.class);
+ when(actionTypePart.getInt()).thenReturn(0);
+
+ gatewayReceiverCommand.cmdExecute(message, serverConnection,
securityService, 0);
+ verify(serverConnection).setFlagProcessMessagesAsFalse();
+ }
+
+ @Test
+ public void cacheClosedAtUpdateEvent() throws Exception {
+ when(cache.getRegion(any())).thenThrow(CacheClosedException.class);
+ when(actionTypePart.getInt()).thenReturn(1);
+
+ gatewayReceiverCommand.cmdExecute(message, serverConnection,
securityService, 0);
+ verify(serverConnection).setFlagProcessMessagesAsFalse();
+ }
+
+ @Test
+ public void cacheClosedAtDestroyEvent() throws Exception {
+
+ when(message.getPart(eq(9))).thenReturn(callbackArgExistsPart);
+ when(message.getPart(eq(10))).thenReturn(versionTimeStampPart);
+
+ when(cache.getRegion(any())).thenThrow(CacheClosedException.class);
+ when(actionTypePart.getInt()).thenReturn(2);
+
+ gatewayReceiverCommand.cmdExecute(message, serverConnection,
securityService, 0);
+ verify(serverConnection).setFlagProcessMessagesAsFalse();
+ }
+
+
+}