This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new ac00f3c7e1 GEODE-10281: Fix WAN data inconsistency (#7665)
ac00f3c7e1 is described below
commit ac00f3c7e13ab7c4b5ed59af48381da97f6d1c0b
Author: Jakov Varenina <[email protected]>
AuthorDate: Thu Jul 7 14:15:38 2022 +0200
GEODE-10281: Fix WAN data inconsistency (#7665)
---
.../internal/cache/wan/GatewaySenderEventImpl.java | 2 +-
.../cache/wan/GatewaySenderEventImplTest.java | 87 ++++--
...eplicateRegionWithSerialGwsDistributedTest.java | 333 +++++++++++++++++++++
3 files changed, 393 insertions(+), 29 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 494e499168..d18a9a5d68 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -853,7 +853,7 @@ public class GatewaySenderEventImpl
// If the message is an update, it may be conflatable. If it is a
// create, destroy, invalidate or destroy-region, it is not conflatable.
// Only updates are conflated.
- return isUpdate();
+ return isUpdate() && !isConcurrencyConflict();
}
@Override
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
index cec3e4b5a2..cf1f5d100e 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java
@@ -33,13 +33,15 @@ import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.stream.Stream;
-import junitparams.Parameters;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.TransactionId;
@@ -61,18 +63,16 @@ import
org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.internal.serialization.VersionedDataOutputStream;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.test.fake.Fakes;
-import org.apache.geode.test.junit.runners.GeodeParamsRunner;
-@RunWith(GeodeParamsRunner.class)
public class GatewaySenderEventImplTest {
private GemFireCacheImpl cache;
- @Rule
- public TestName testName = new TestName();
+ private String testName;
- @Before
- public void setUpGemFire() {
+ @BeforeEach
+ public void setUpGemFire(TestInfo testInfo) {
+ testName = testInfo.getDisplayName();
createCache();
}
@@ -110,8 +110,8 @@ public class GatewaySenderEventImplTest {
assertThat(gatewaySenderEvent.getTransactionId()).isNotNull();
}
- @Test
- @Parameters(method = "getVersionsAndExpectedInvocations")
+ @ParameterizedTest
+ @MethodSource("getVersionsAndExpectedInvocations")
public void
testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations
vaei)
throws IOException {
GatewaySenderEventImpl gatewaySenderEvent =
spy(GatewaySenderEventImpl.class);
@@ -129,8 +129,8 @@ public class GatewaySenderEventImplTest {
any());
}
- @Test
- @Parameters(method = "getVersionsAndExpectedInvocations")
+ @ParameterizedTest
+ @MethodSource("getVersionsAndExpectedInvocations")
public void testDeserializingDataFromOldVersionToCurrentVersion(
VersionAndExpectedInvocations vaei)
throws IOException, ClassNotFoundException {
@@ -151,18 +151,17 @@ public class GatewaySenderEventImplTest {
any());
}
- private VersionAndExpectedInvocations[] getVersionsAndExpectedInvocations() {
- return new VersionAndExpectedInvocations[] {
- new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0),
- new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0),
- new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1)
- };
+ private static Stream<Arguments> getVersionsAndExpectedInvocations() {
+ return Stream.of(
+ Arguments.of(new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0)),
+ Arguments.of(new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0)),
+ Arguments.of(new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1,
1)));
}
@Test
public void testEquality() throws Exception {
LocalRegion region = mock(LocalRegion.class);
- when(region.getFullPath()).thenReturn(testName.getMethodName() +
"_region");
+ when(region.getFullPath()).thenReturn(testName + "_region");
when(region.getCache()).thenReturn(cache);
Object event =
ParallelGatewaySenderHelper.createGatewaySenderEvent(region, Operation.CREATE,
"key1", "value1", 0, 0, 0, 0);
@@ -209,7 +208,7 @@ public class GatewaySenderEventImplTest {
assertThat(event).isNotEqualTo(eventDifferentValue);
LocalRegion region2 = mock(LocalRegion.class);
- when(region2.getFullPath()).thenReturn(testName.getMethodName() +
"_region2");
+ when(region2.getFullPath()).thenReturn(testName + "_region2");
when(region2.getCache()).thenReturn(cache);
Object eventDifferentRegion =
ParallelGatewaySenderHelper.createGatewaySenderEvent(region2,
Operation.CREATE,
@@ -221,7 +220,7 @@ public class GatewaySenderEventImplTest {
public void testSerialization() throws Exception {
// Set up test
LocalRegion region = mock(LocalRegion.class);
- when(region.getFullPath()).thenReturn(testName.getMethodName() +
"_region");
+ when(region.getFullPath()).thenReturn(testName + "_region");
when(region.getCache()).thenReturn(cache);
TXId txId = new TXId(cache.getMyId(), 0);
when(region.getTXId()).thenReturn(txId);
@@ -348,12 +347,13 @@ public class GatewaySenderEventImplTest {
return cacheEvent;
}
- @Parameters({"true, true", "true, false", "false, false"})
+ @ParameterizedTest
+ @CsvSource({"true,true", "true,false", "false,false"})
public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean
isGenerateCallbacks,
boolean isCallbackArgumentNull)
throws IOException {
- InternalRegion region = mock(InternalRegion.class);
- when(region.getFullPath()).thenReturn(testName.getMethodName() +
"_region");
+ InternalRegion region = mock(LocalRegion.class);
+ when(region.getFullPath()).thenReturn(testName + "_region");
Operation operation = mock(Operation.class);
when(operation.isLocalLoad()).thenReturn(true);
@@ -377,6 +377,37 @@ public class GatewaySenderEventImplTest {
assertThat(event.getAction()).isEqualTo(action);
}
+ @Test
+ public void testShouldNotBeConflatedCreate() throws IOException {
+ final EntryEventImpl cacheEvent =
mockEntryEventImpl(mock(TransactionId.class));
+
+ final GatewaySenderEventImpl gatewaySenderEvent =
+ new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent,
null, INCLUDE);
+
+ assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
+ }
+
+ @Test
+ public void testShouldBeConflatedUpdate() throws IOException {
+ final EntryEventImpl cacheEvent =
mockEntryEventImpl(mock(TransactionId.class));
+
+ final GatewaySenderEventImpl gatewaySenderEvent =
+ new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent,
null, INCLUDE);
+
+ assertThat(gatewaySenderEvent.shouldBeConflated()).isTrue();
+ }
+
+ @Test
+ public void testShouldNotBeConflatedUpdateConcurrentConflict() throws
IOException {
+ final EntryEventImpl cacheEvent =
mockEntryEventImpl(mock(TransactionId.class));
+ when(cacheEvent.isConcurrencyConflict()).thenReturn(true);
+
+ final GatewaySenderEventImpl gatewaySenderEvent =
+ new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent,
null, INCLUDE);
+
+ assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
+ }
+
public static class VersionAndExpectedInvocations {
private final KnownVersion version;
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
new file mode 100644
index 0000000000..c940ade012
--- /dev/null
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewaySender;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest
+ implements Serializable {
+
+ @Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ public static boolean ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER;
+
+ private MemberVM locator1Site2;
+
+ private MemberVM server1Site1;
+ private MemberVM server2Site1;
+
+ private MemberVM server1Site2;
+ private MemberVM server2Site2;
+
+ private int server1Site2Port;
+ private int server2Site2Port;
+
+ private ClientVM clientConnectedToServer1Site2;
+ private ClientVM clientConnectedToServer2Site2;
+
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+ private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+ private static final String REGION_NAME = "test1";
+
+ private static final String GATEWAY_SENDER_ID = "ln";
+
+ private final Map.Entry<Integer, Integer> ENTRY_INITIAL = new
AbstractMap.SimpleEntry<>(1, 0);
+ private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_WINNER =
+ new AbstractMap.SimpleEntry<>(1, 1);
+ private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_LOSER =
+ new AbstractMap.SimpleEntry<>(1, 2);
+
+ @Before
+ public void setupMultiSite() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+ MemberVM locator1Site1 = clusterStartupRule.startLocatorVM(0, props);
+ MemberVM locator2Site1 = clusterStartupRule.startLocatorVM(1, props,
locator1Site1.getPort());
+
+ // start servers for site #1
+ server1Site1 =
+ clusterStartupRule.startServerVM(2, locator1Site1.getPort(),
locator2Site1.getPort());
+ server2Site1 =
+ clusterStartupRule.startServerVM(3, locator1Site1.getPort(),
locator2Site1.getPort());
+ connectGfshToSite(locator1Site1);
+
+ // create partition region on site #1
+ CommandStringBuilder regionCmd = new
CommandStringBuilder(CliStrings.CREATE_REGION);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE");
+
+ gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+ String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+ .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+ .getCommandString();
+
+ gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+ server1Site1.invoke(
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState);
+ server2Site1.invoke(
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState);
+
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+ props.setProperty(REMOTE_LOCATORS,
+ "localhost[" + locator1Site1.getPort() + "],localhost[" +
locator2Site1.getPort() + "]");
+ locator1Site2 = clusterStartupRule.startLocatorVM(5, props);
+
+ // start servers for site #2
+ server1Site2 = clusterStartupRule.startServerVM(6,
locator1Site2.getPort());
+ server2Site2 = clusterStartupRule.startServerVM(7,
locator1Site2.getPort());
+
+ server2Site2Port = server2Site2.getPort();
+ server1Site2Port = server1Site2.getPort();
+
+ // create gateway-sender on site #2
+ connectGfshToSite(locator1Site2);
+ String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+ .addOption(CliStrings.MEMBERS, server2Site2.getName())
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID)
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID,
"1")
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "false")
+ .addOption(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION,
"true")
+ .getCommandString();
+ gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+ verifyGatewaySenderState(server2Site2, false);
+
+ executeGatewaySenderActionCommandSite2(CliStrings.PAUSE_GATEWAYSENDER);
+
+ // create partition region on site #2
+ regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+ regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE");
+ regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID,
GATEWAY_SENDER_ID);
+ gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+ }
+
+ @Test
+ public void testEventIsNotConflatedWhenConcurrentModificationIsDetected()
throws Exception {
+ startClientToServer1Site2(server1Site2Port);
+ startClientToServer2Site2(server2Site2Port);
+
+ clientConnectedToServer2Site2.invoke(() ->
executePutOperation(ENTRY_INITIAL));
+ waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_INITIAL,
server1Site2, server2Site2);
+
+ // Configure cache writer on server to delay writing of entry in order to
provoke
+ // the internal conflict
+ server1Site2.invoke(() -> {
+ InternalRegion region =
+ ClusterStartupRule.getCache().getInternalRegionByPath("/" +
REGION_NAME);
+ region.getAttributesMutator().setCacheWriter(new
TestCacheWriterDelayWritingOfEntry(
+ ENTRY_CONFLICT_RESOLUTION_WINNER, ENTRY_CONFLICT_RESOLUTION_LOSER));
+ });
+
+ clientConnectedToServer2Site2.invokeAsync(() -> executePutOperation(
+ ENTRY_CONFLICT_RESOLUTION_WINNER));
+
+ server1Site2.invoke(() -> await().untilAsserted(() -> assertThat(
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER)
+ .isTrue()));
+
+ clientConnectedToServer1Site2.invokeAsync(() -> executePutOperation(
+ ENTRY_CONFLICT_RESOLUTION_LOSER));
+
+ // Check that expected entry has won the internal conflict resolution
+
waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER,
+ server1Site2,
+ server2Site2);
+
+ server2Site2.invoke(
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::awaitQueueSize);
+ executeGatewaySenderActionCommandSite2(CliStrings.RESUME_GATEWAYSENDER);
+
+ // check that expected event is replicated to the remote cluster
+
waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER,
+ server1Site1,
+ server2Site1);
+ }
+
+ private void waitUntilEventIsConsistentlyReplicatedAcrossServers(
+ final Map.Entry<Integer, Integer> entry,
+ MemberVM... servers) {
+ await().untilAsserted(() ->
isEventIsConsistentlyReplicatedAcrossServers(entry, servers));
+ }
+
+ private static void isEventIsConsistentlyReplicatedAcrossServers(
+ final Map.Entry<Integer, Integer> entry,
+ MemberVM... servers) {
+ for (MemberVM server : servers) {
+ assertThat(server.invoke(() -> doesEventExistOnServer(entry))).isTrue();
+ }
+ }
+
+ private static boolean doesEventExistOnServer(Map.Entry<Integer, Integer>
entry) {
+ Region<Integer, Integer> region =
+ ClusterStartupRule.getCache().getRegion("/" + REGION_NAME);
+ return Objects.equals(region.get(entry.getKey()), entry.getValue());
+ }
+
+ private void executeGatewaySenderActionCommandSite2(final String action)
throws Exception {
+ connectGfshToSite(locator1Site2);
+ CommandStringBuilder regionCmd = new CommandStringBuilder(action);
+ regionCmd.addOption(CliStrings.MEMBERS, server2Site2.getName());
+ regionCmd.addOption(CliStrings.PAUSE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID);
+ gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+ verifyGatewaySenderState(server2Site2,
CliStrings.PAUSE_GATEWAYSENDER.equals(action));
+ }
+
+ private void executePutOperation(Map.Entry<Integer, Integer> entry) {
+ Region<Integer, Integer> region =
+ ClusterStartupRule.clientCacheRule.getCache().getRegion(REGION_NAME);
+ region.put(entry.getKey(), entry.getValue());
+ }
+
+ private static void awaitQueueSize() {
+ await()
+ .untilAsserted(() -> validateQueueSize(
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.GATEWAY_SENDER_ID,
+ 3));
+ }
+
+ private static void validateQueueSize(String senderId, int numQueueEntries) {
+ GatewaySender sender =
ClusterStartupRule.getCache().getGatewaySender(senderId);
+ Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+ int size = 0;
+ for (RegionQueue q : queues) {
+ size += q.size();
+ }
+ assertThat(size).isEqualTo(numQueueEntries);
+ }
+
+ private static void verifyReceiverState() {
+ Set<GatewayReceiver> receivers =
ClusterStartupRule.getCache().getGatewayReceivers();
+ for (GatewayReceiver receiver : receivers) {
+ assertThat(receiver.isRunning()).isEqualTo(true);
+ }
+ }
+
+ private void verifyGatewaySenderState(MemberVM memberVM, boolean isPaused) {
+ memberVM.invoke(() -> verifySenderState(GATEWAY_SENDER_ID, true,
isPaused));
+ locator1Site2.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()),
GATEWAY_SENDER_ID, true,
+ isPaused));
+ }
+
+ private static InternalDistributedMember getMember(final VM vm) {
+ return vm.invoke(() -> ClusterStartupRule.getCache().getMyId());
+ }
+
+ private void startClientToServer1Site2(final int serverPort) throws
Exception {
+ clientConnectedToServer1Site2 =
+ clusterStartupRule.startClientVM(8, c ->
c.withServerConnection(serverPort));
+ clientConnectedToServer1Site2.invoke(() -> {
+ ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+ });
+ }
+
+ private void startClientToServer2Site2(final int serverPort) throws
Exception {
+ clientConnectedToServer2Site2 =
+ clusterStartupRule.startClientVM(4, c ->
c.withServerConnection(serverPort));
+ clientConnectedToServer2Site2.invoke(() -> {
+ ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+ });
+ }
+
+ private void connectGfshToSite(MemberVM locator) throws Exception {
+ if (gfsh.isConnected()) {
+ gfsh.disconnect();
+ }
+ gfsh.connectAndVerify(locator);
+ }
+
+ public static class TestCacheWriterDelayWritingOfEntry<K, V> implements
CacheWriter<K, V> {
+ private final Map.Entry<Integer, Integer> entryToDelay;
+
+ private final Map.Entry<Integer, Integer> waitUntilEntry;
+
+ public TestCacheWriterDelayWritingOfEntry(Map.Entry<Integer, Integer>
entryToDelay,
+ Map.Entry<Integer, Integer> waitUntilEntry) {
+ this.entryToDelay = entryToDelay;
+ this.waitUntilEntry = waitUntilEntry;
+ }
+
+ @Override
+ public void beforeUpdate(EntryEvent<K, V> event) throws
CacheWriterException {
+ Region<Integer, Integer> region =
ClusterStartupRule.getCache().getRegion("/" + REGION_NAME);
+ int value = (Integer) event.getNewValue();
+ int key = (Integer) event.getKey();
+ if (key == entryToDelay.getKey() && value == entryToDelay.getValue()) {
+
InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER
=
+ true;
+ await().untilAsserted(() ->
assertThat(region.get(waitUntilEntry.getKey()))
+ .isEqualTo(waitUntilEntry.getValue()));
+ }
+ }
+
+ @Override
+ public void beforeCreate(EntryEvent<K, V> event) throws
CacheWriterException {}
+
+ @Override
+ public void beforeDestroy(EntryEvent<K, V> event) throws
CacheWriterException {}
+
+ @Override
+ public void beforeRegionDestroy(RegionEvent<K, V> event) throws
CacheWriterException {}
+
+ @Override
+ public void beforeRegionClear(RegionEvent<K, V> event) throws
CacheWriterException {}
+ }
+}