This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3927ba9 GEODE-8202: Two-step serial gw sender threads start (#5900)
3927ba9 is described below
commit 3927ba9eeed5bee7e7639f926b48409c4b6dd3a7
Author: Alberto Bustamante Reyes <[email protected]>
AuthorDate: Mon Jan 18 13:57:31 2021 +0100
GEODE-8202: Two-step serial gw sender threads start (#5900)
* GEODE-8202: Two-step serial gw sender threads start
---
...iversWithSamePortAndHostnameForSendersTest.java | 127 +++++++++++-
.../geode-list-gateway-receivers-server1.gfsh | 20 ++
.../geode-list-gateway-receivers-server2.gfsh | 20 ++
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +-
.../geode/cache/configuration/CacheConfig.java | 23 +++
.../org/apache/geode/cache/wan/GatewaySender.java | 10 +
.../geode/cache/wan/GatewaySenderFactory.java | 13 ++
.../internal/cache/wan/AbstractGatewaySender.java | 18 ++
.../wan/AbstractGatewaySenderEventProcessor.java | 8 +
.../internal/cache/wan/GatewaySenderAdvisor.java | 27 ++-
.../cache/wan/GatewaySenderAttributes.java | 6 +
...oncurrentSerialGatewaySenderEventProcessor.java | 57 ++++--
.../geode/internal/cache/xmlcache/CacheXml.java | 5 +-
.../internal/cache/xmlcache/CacheXmlGenerator.java | 10 +
.../internal/cache/xmlcache/CacheXmlParser.java | 11 ++
.../geode/management/internal/i18n/CliStrings.java | 4 +
.../geode.apache.org/schema/cache/cache-1.0.xsd | 1 +
.../gfsh/command-pages/create.html.md.erb | 5 +
.../cli/commands/CreateGatewaySenderCommand.java | 27 ++-
.../cli/functions/GatewaySenderCreateFunction.java | 7 +
.../cli/functions/GatewaySenderFunctionArgs.java | 6 +
.../sanctioned-geode-gfsh-serializables.txt | 2 +-
.../commands/CreateGatewaySenderCommandTest.java | 68 ++++++-
...CreateDestroyGatewaySenderCommandDUnitTest.java | 2 +-
.../wan/GatewaySenderEventRemoteDispatcher.java | 93 +++++++--
.../cache/wan/GatewaySenderFactoryImpl.java | 10 +-
.../RemoteSerialGatewaySenderEventProcessor.java | 2 +-
.../cache/wan/serial/SerialGatewaySenderImpl.java | 1 +
...atewaySenderEventRemoteDispatcherJUnitTest.java | 214 ++++++++++++++++++++-
29 files changed, 751 insertions(+), 54 deletions(-)
diff --git
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 6be6066..7247362 100644
---
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -22,13 +22,17 @@ import static
org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
+import java.util.Vector;
import com.palantir.docker.compose.DockerComposeRule;
import org.junit.BeforeClass;
@@ -71,14 +75,8 @@ import org.apache.geode.test.junit.categories.WanTest;
* traffic directed to the 2324 port to the receivers in a round robin fashion.
*
* - Another site consisting of a 1-server, 1-locator Geode cluster.
- * The server hosts a partition region (region-wan) and has a parallel gateway
receiver
+ * The server hosts a partition region (region-wan) and has a gateway receiver
* to send events to the remote site.
- *
- * The aim of the tests is verify that when several gateway receivers in a
remote site
- * share the same port and hostname-for-senders, the pings sent from the
gateway senders
- * reach the right gateway receiver and not just any of the receivers. Failure
to do this
- * may result in the closing of connections by a gateway receiver for not
having
- * received the ping in time.
*/
@Category({WanTest.class})
public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
@@ -122,6 +120,13 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
super();
}
+ /**
+ * The aim of this test is verify that when several gateway receivers in a
remote site
+ * share the same port and hostname-for-senders, the pings sent from the
gateway senders
+ * reach the right gateway receiver and not just any of the receivers.
Failure to do this
+ * may result in the closing of connections by a gateway receiver for not
having
+ * received the ping in time.
+ */
@Test
public void
testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceivers()
throws InterruptedException {
@@ -159,6 +164,105 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
assertEquals(0, senderPoolDisconnects);
}
+ @Test
+ public void testSerialGatewaySenderThreadsConnectToSameReceiver() {
+ String senderId = "ln";
+ String regionName = "region-wan";
+ final int remoteLocPort = 20334;
+
+ int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+ VM vm1 = VM.getVM(1);
+ createCache(vm1, locPort);
+
+ createGatewaySender(vm1, senderId, 2, false, 5,
+ 3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+ createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+ assertTrue(allDispatchersConnectedToSameReceiver(1));
+ assertTrue(allDispatchersConnectedToSameReceiver(2));
+
+ }
+
+ @Test
+ public void
testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectToSameServer()
{
+ String senderId = "ln";
+ final int remoteLocPort = 20334;
+
+ int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+ VM vm1 = VM.getVM(1);
+ createCache(vm1, locPort);
+
+ VM vm2 = VM.getVM(2);
+ createCache(vm2, locPort);
+
+ createGatewaySender(vm1, senderId, 2, false, 5,
+ 3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+ Exception exception =
+ assertThrows(Exception.class, () -> createGatewaySender(vm2, senderId,
2, false, 5,
+ 3, GatewaySender.DEFAULT_ORDER_POLICY, false));
+ assertEquals(exception.getCause().getMessage(), "Cannot create Gateway
Sender " + senderId
+ + " with enforceThreadsConnectSameReceiver false because another cache
has the same Gateway Sender defined with enforceThreadsConnectSameReceiver
true");
+
+ }
+
+ private boolean allDispatchersConnectedToSameReceiver(int server) {
+
+ String gfshOutput = runListGatewayReceiversCommandInServer(server);
+ Vector<String> sendersConnectedToServer =
parseSendersConnectedFromGfshOutput(gfshOutput);
+ String firstSenderId = "";
+ for (String senderId : sendersConnectedToServer) {
+ if (firstSenderId.equals("")) {
+ firstSenderId = senderId;
+ } else {
+ assertEquals("Found two different senders (" + firstSenderId + " and "
+ senderId
+ + ") connected to same receiver in server " + server,
firstSenderId, senderId);
+ }
+ }
+ return true;
+ }
+
+
+ private String runListGatewayReceiversCommandInServer(int serverN) {
+ String result = "";
+ try {
+ result = docker.get().exec(options("-T"), "locator",
+ arguments("gfsh", "run",
+ "--file=/geode/scripts/geode-list-gateway-receivers-server" +
serverN + ".gfsh"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ return result;
+ }
+ }
+
+ private Vector<String> parseSendersConnectedFromGfshOutput(String
gfshOutput) {
+ String lines[] = gfshOutput.split(System.getProperty("line.separator"));
+ final String sendersConnectedColumnHeader = "Senders Connected";
+ String receiverInfo = null;
+ for (int i = 0; i < lines.length; i++) {
+ if (lines[i].contains(sendersConnectedColumnHeader)) {
+ receiverInfo = lines[i + 2];
+ break;
+ }
+ }
+ assertNotNull(
+ "Error parsing gfsh output. '" + sendersConnectedColumnHeader + "'
column header not found",
+ receiverInfo);
+ String[] tableRow = receiverInfo.split("\\|");
+ String sendersConnectedColumnValue = tableRow[3].trim();
+ Vector<String> senders = new Vector<String>();
+ for (String sender : sendersConnectedColumnValue.split(",")) {
+ senders.add(sender.trim());
+ }
+ return senders;
+ }
+
private int createLocator(VM memberVM, int dsId, int remoteLocPort) {
return memberVM.invoke("create locator", () -> {
Properties props = new Properties();
@@ -182,6 +286,14 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
boolean isParallel, Integer batchSize,
int numDispatchers,
GatewaySender.OrderPolicy orderPolicy) {
+ createGatewaySender(vm, dsName, remoteDsId, isParallel, batchSize,
numDispatchers, orderPolicy,
+ true);
+ }
+
+ public static void createGatewaySender(VM vm, String dsName, int remoteDsId,
+ boolean isParallel, Integer batchSize,
+ int numDispatchers,
+ GatewaySender.OrderPolicy orderPolicy, boolean
enforceThreadsConnectToSameReceiver) {
vm.invoke(() -> {
final IgnoredException exln =
IgnoredException.addIgnoredException("Could not connect");
try {
@@ -191,6 +303,7 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
gateway.setBatchSize(batchSize);
gateway.setDispatcherThreads(numDispatchers);
gateway.setOrderPolicy(orderPolicy);
+
gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectToSameReceiver);
gateway.create(dsName, remoteDsId);
} finally {
diff --git
a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
new file mode 100644
index 0000000..a0d61bb
--- /dev/null
+++
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server1
diff --git
a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
new file mode 100644
index 0000000..37a16dc
--- /dev/null
+++
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server2
diff --git
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 65d8796..50ce9e4 100644
---
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1910,10 +1910,12 @@ org/apache/geode/internal/cache/versions/VersionTag,2
fromData,225
toData,254
-org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4
-fromData,283
+org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6
+fromData,17
+fromDataPre_GEODE_1_14_0_0,293
fromDataPre_GFE_8_0_0_0,188
-toData,271
+toData,17
+toDataPre_GEODE_1_14_0_0,281
toDataPre_GFE_8_0_0_0,236
org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index f1b119b..85fe90e 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -2656,6 +2656,8 @@ public class CacheConfig {
protected String orderPolicy;
@XmlAttribute(name = "group-transaction-events")
protected Boolean groupTransactionEvents;
+ @XmlAttribute(name = "enforce-threads-connect-same-receiver")
+ protected Boolean enforceThreadsConnectSameReceiver;
/**
* Gets the value of the gatewayEventFilters property.
@@ -3100,6 +3102,27 @@ public class CacheConfig {
this.orderPolicy = value;
}
+ /**
+ * Sets the value of the enforceThreadsConnectSameReceiver property.
+ *
+ * allowed object is
+ * {@link Boolean }
+ *
+ */
+ public void setEnforceThreadsConnectSameReceiver(Boolean value) {
+ this.enforceThreadsConnectSameReceiver = value;
+ }
+
+ /**
+ * Gets the value of the enforceThreadsConnectSameReceiver property.
+ *
+ * possible object is
+ * {@link Boolean }
+ *
+ */
+ public Boolean getEnforceThreadsConnectSameReceiver() {
+ return this.enforceThreadsConnectSameReceiver;
+ }
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
index ffacf4b..5e0e9f1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
@@ -147,6 +147,8 @@ public interface GatewaySender {
boolean DEFAULT_IS_FOR_INTERNAL_USE = false;
+ boolean DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER = false;
+
/**
* Retry a connection from sender to receiver after specified time interval
(in milliseconds) in
* case receiver is not up and running. Default is set to 1000 milliseconds
i.e. 1 second.
@@ -449,4 +451,12 @@ public interface GatewaySender {
*
*/
void destroy();
+
+ /**
+ * Returns enforceThreadsConnectSameReceiver boolean property for this
GatewaySender.
+ *
+ * @return enforceThreadsConnectSameReceiver boolean property for this
GatewaySender
+ *
+ */
+ boolean getEnforceThreadsConnectSameReceiver();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
index 7c99214..6c9e92b 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
@@ -191,6 +191,19 @@ public interface GatewaySenderFactory {
GatewaySenderFactory
setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
/**
+ * If true, receiver member id is checked by all dispatcher threads when the
connection is
+ * established to ensure they connect to the same receiver. Instead of
starting all dispatcher
+ * threads in parallel, one thread is started first, and after that the rest
are started in
+ * parallel. Default is false.
+ *
+ * @param enforceThreadsConnectSameReceiver boolean if true threads will
verify if they are
+ * connected to the same receiver
+ *
+ */
+ GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+ boolean enforceThreadsConnectSameReceiver);
+
+ /**
* Creates a <code>GatewaySender</code> to communicate with remote
distributed system
*
* @param id unique id for this SerialGatewaySender
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7f6c8a1..4ea2c6d 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -171,6 +171,8 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
private ServerLocation serverLocation;
+ private String expectedReceiverUniqueId = "";
+
protected Object queuedEventsSync = new Object();
protected volatile boolean enqueuedAllTempQueueEvents = false;
@@ -237,6 +239,8 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
private final StatisticsClock statisticsClock;
+ protected boolean enforceThreadsConnectSameReceiver;
+
protected AbstractGatewaySender() {
statisticsClock = disabledClock();
}
@@ -275,6 +279,7 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
this.maxMemoryPerDispatcherQueue = this.queueMemory /
this.dispatcherThreads;
this.serialNumber = DistributionAdvisor.createSerialNumber();
this.isMetaQueue = attrs.isMetaQueue();
+ this.enforceThreadsConnectSameReceiver =
attrs.getEnforceThreadsConnectSameReceiver();
if (!(this.cache instanceof CacheCreation)) {
this.myDSId =
this.cache.getInternalDistributedSystem().getDistributionManager()
.getDistributedSystemId();
@@ -500,6 +505,11 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
}
@Override
+ public boolean getEnforceThreadsConnectSameReceiver() {
+ return this.enforceThreadsConnectSameReceiver;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
@@ -1429,6 +1439,14 @@ public abstract class AbstractGatewaySender implements
InternalGatewaySender, Di
}
}
+ public void setExpectedReceiverUniqueId(String expectedReceiverUniqueId) {
+ this.expectedReceiverUniqueId = expectedReceiverUniqueId;
+ }
+
+ public String getExpectedReceiverUniqueId() {
+ return this.expectedReceiverUniqueId;
+ }
+
/**
* Has a reference to a GatewayEventImpl and has a timeout value.
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0609ec9..294121a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -154,6 +154,14 @@ public abstract class AbstractGatewaySenderEventProcessor
extends LoggingThread
this.threadMonitoring = tMonitoring;
}
+ public void setExpectedReceiverUniqueId(String uniqueId) {
+ this.sender.setExpectedReceiverUniqueId(uniqueId);
+ }
+
+ public String getExpectedReceiverUniqueId() {
+ return this.sender.getExpectedReceiverUniqueId();
+ }
+
public Object getRunningStateLock() {
return runningStateLock;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index adf80cb..6af0866 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -232,6 +232,15 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
"Cannot create Gateway Sender %s with isDiskSynchronous %s
because another cache has the same Gateway Sender defined with
isDiskSynchronous %s",
sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
}
+ if
(sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0))
{
+ if (sp.enforceThreadsConnectSameReceiver !=
sender.getEnforceThreadsConnectSameReceiver()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot create Gateway Sender %s with
enforceThreadsConnectSameReceiver %s because another cache has the same Gateway
Sender defined with enforceThreadsConnectSameReceiver %s",
+ sp.Id, sp.enforceThreadsConnectSameReceiver,
+ sender.getEnforceThreadsConnectSameReceiver()));
+ }
+ }
}
/**
@@ -532,6 +541,8 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
public ServerLocation serverLocation;
+ public boolean enforceThreadsConnectSameReceiver = false;
+
public GatewaySenderProfile(InternalDistributedMember memberId, int
version) {
super(memberId, version);
}
@@ -541,6 +552,12 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException,
ClassNotFoundException {
+ fromDataPre_GEODE_1_14_0_0(in, context);
+ this.enforceThreadsConnectSameReceiver = in.readBoolean();
+ }
+
+ public void fromDataPre_GEODE_1_14_0_0(DataInput in,
+ DeserializationContext context) throws IOException,
ClassNotFoundException {
super.fromData(in, context);
this.Id = DataSerializer.readString(in);
this.startTime = in.readLong();
@@ -578,11 +595,18 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
this.serverLocation = new ServerLocation();
InternalDataSerializer.invokeFromData(this.serverLocation, in);
}
+ this.enforceThreadsConnectSameReceiver = in.readBoolean();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
+ toDataPre_GEODE_1_14_0_0(out, context);
+ out.writeBoolean(enforceThreadsConnectSameReceiver);
+ }
+
+ public void toDataPre_GEODE_1_14_0_0(DataOutput out,
+ SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeString(Id, out);
out.writeLong(startTime);
@@ -617,6 +641,7 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
if (serverLocationFound) {
InternalDataSerializer.invokeToData(serverLocation, out);
}
+ out.writeBoolean(enforceThreadsConnectSameReceiver);
}
public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext
context)
@@ -684,7 +709,7 @@ public class GatewaySenderAdvisor extends
DistributionAdvisor {
@Immutable
private static final KnownVersion[] serializationVersions =
- new KnownVersion[] {KnownVersion.GFE_80};
+ new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0};
@Override
public KnownVersion[] getSerializationVersions() {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
index 1457776..581b576 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
@@ -85,6 +85,9 @@ public class GatewaySenderAttributes {
public boolean forwardExpirationDestroy =
GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
+ public boolean enforceThreadsConnectSameReceiver =
+ GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
+
public int getSocketBufferSize() {
return this.socketBufferSize;
}
@@ -205,4 +208,7 @@ public class GatewaySenderAttributes {
return this.forwardExpirationDestroy;
}
+ public boolean getEnforceThreadsConnectSameReceiver() {
+ return this.enforceThreadsConnectSameReceiver;
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 06f74ae..7adf996 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -179,12 +179,27 @@ public class ConcurrentSerialGatewaySenderEventProcessor
@Override
public void run() {
- for (int i = 0; i < this.processors.size(); i++) {
- if (logger.isDebugEnabled()) {
+ boolean isDebugEnabled = logger.isDebugEnabled();
+ if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+ this.processors.get(0).start();
+ waitForRunningStatus(this.processors.get(0));
+ String receiverUniqueId =
this.processors.get(0).getExpectedReceiverUniqueId();
+ if (isDebugEnabled) {
+ logger.debug("First dispatcher is connected to " + receiverUniqueId);
+ }
+ for (int j = 1; j < this.processors.size(); j++) {
+ this.processors.get(j).setExpectedReceiverUniqueId(receiverUniqueId);
+ }
+ }
+
+ for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i
< this.processors
+ .size(); i++) {
+ if (isDebugEnabled) {
logger.debug("Starting the serialProcessor {}", i);
}
this.processors.get(i).start();
}
+
try {
waitForRunningStatus();
} catch (GatewaySenderException e) {
@@ -205,7 +220,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
try {
serialProcessor.join();
} catch (InterruptedException e) {
- if (logger.isDebugEnabled()) {
+ if (isDebugEnabled) {
logger.debug("Got InterruptedException while waiting for child
threads to finish.");
Thread.currentThread().interrupt();
}
@@ -219,24 +234,28 @@ public class ConcurrentSerialGatewaySenderEventProcessor
throw new UnsupportedOperationException();
}
- private void waitForRunningStatus() {
- for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
- synchronized (serialProcessor.getRunningStateLock()) {
- while (serialProcessor.getException() == null &&
serialProcessor.isStopped()) {
- try {
- serialProcessor.getRunningStateLock().wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- Exception ex = serialProcessor.getException();
- if (ex != null) {
- throw new GatewaySenderException(
- String.format("Could not start a gateway sender %s because of
exception %s",
- new Object[] {this.sender.getId(), ex.getMessage()}),
- ex.getCause());
+ private void waitForRunningStatus(SerialGatewaySenderEventProcessor
serialProcessor) {
+ synchronized (serialProcessor.getRunningStateLock()) {
+ while (serialProcessor.getException() == null &&
serialProcessor.isStopped()) {
+ try {
+ serialProcessor.getRunningStateLock().wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ Exception ex = serialProcessor.getException();
+ if (ex != null) {
+ throw new GatewaySenderException(
+ String.format("Could not start a gateway sender %s because of
exception %s",
+ new Object[] {this.sender.getId(), ex.getMessage()}),
+ ex.getCause());
+ }
+ }
+ }
+
+ private void waitForRunningStatus() {
+ for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+ waitForRunningStatus(serialProcessor);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
index 1ac84a4..9970c55 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
@@ -433,8 +433,11 @@ public abstract class CacheXml implements EntityResolver2,
ErrorHandler {
protected static final String ORDER_POLICY = "order-policy";
/** The name of the <code>remote-distributed-system</code> attribute */
protected static final String REMOTE_DISTRIBUTED_SYSTEM_ID =
"remote-distributed-system-id";
+ /** The name of the <code>group-transaction-events</code> attribute */
protected static final String GROUP_TRANSACTION_EVENTS =
"group-transaction-events";
-
+ /** The name of the <code>enforce-threads-connect-same-receiver</code>
attribute */
+ protected static final String ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+ "enforce-threads-connect-same-receiver";
/** The name of the <code>bind-address</code> attribute */
protected static final String BIND_ADDRESS = "bind-address";
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
index ea19b88..04ac07e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1394,6 +1394,16 @@ public class CacheXmlGenerator extends CacheXml
implements XMLReader {
}
}
+ // enforce-threads-connect-same-receiver
+ if (version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) {
+ if (generateDefaults()
+ || sender
+ .getEnforceThreadsConnectSameReceiver() !=
GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER) {
+ atts.addAttribute("", "", ENFORCE_THREADS_CONNECT_SAME_RECEIVER, "",
+ String.valueOf(sender.getEnforceThreadsConnectSameReceiver()));
+ }
+ }
+
handler.startElement("", GATEWAY_SENDER, GATEWAY_SENDER, atts);
for (GatewayEventFilter gef : sender.getGatewayEventFilters()) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
index c775086..ce4d211 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
@@ -707,6 +707,17 @@ public class CacheXmlParser extends CacheXml implements
ContentHandler {
gatewaySenderFactory
.setGroupTransactionEvents(Boolean.parseBoolean(groupTransactionEvents));
}
+
+ String enforceThreadsConnectSameReceiver =
atts.getValue(ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+ if (enforceThreadsConnectSameReceiver == null) {
+ gatewaySenderFactory
+ .setEnforceThreadsConnectSameReceiver(
+ GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+ } else {
+ gatewaySenderFactory
+ .setEnforceThreadsConnectSameReceiver(
+ Boolean.parseBoolean(enforceThreadsConnectSameReceiver));
+ }
}
private void startGatewayReceiver(Attributes atts) {
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index 4ce4416..3b08621 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -2267,6 +2267,10 @@ public class CliStrings {
"GatewaySender \"{0}\" created on \"{1}\"";
public static final String
CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
"Gateway Sender cannot be created until all members are the current
version";
+ public static final String
CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+ "enforce-threads-connect-same-receiver";
+ public static final String
CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP =
+ "Whether or not the sender threads have to verify the receiver member id
to verify if they are connected to the same server.";
/* start gateway-sender */
public static final String START_GATEWAYSENDER = "start gateway-sender";
diff --git
a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 53b57f2..db6841a 100755
---
a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++
b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -203,6 +203,7 @@ declarative caching XML file elements unless indicated
otherwise.
<xsd:attribute name="dispatcher-threads" type="xsd:string"
use="optional" />
<xsd:attribute name="order-policy" type="xsd:string"
use="optional" />
<xsd:attribute name="group-transaction-events" type="xsd:boolean"
use="optional" />
+ <xsd:attribute name="enforce-threads-connect-same-receiver"
type="xsd:boolean" use="optional" />
</xsd:complexType>
</xsd:element>
diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
index 30b4da1..11b4192 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
@@ -638,7 +638,12 @@ create gateway-sender --id=value
--remote-distributed-system-id=value
<p>Only allowed to be set on gateway senders with the <code class="ph
codeph">parallel</code> attribute set to false and <code class="ph
codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders
with the <code class="ph codeph">parallel</code> attribute set to true. Also,
the <code class="ph codeph">enable-batch-conflation</code> attribute of the
gateway sender must be set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the
transaction events belong must be replicated by the same set of senders with
this flag enabled.</p>
<p><b>Note:</b> If the above condition is not fulfilled or under very high
load traffic conditions, it may not be guaranteed that all the events for a
transaction will be sent in the same batch, even if <code class="ph
codeph">group-transaction-events</code> is enabled. The number of batches sent
with incomplete transactions can be retrieved from the <code class="ph
codeph">GatewaySenderMXBean</code> bean.</p></td>
+<td>false</td>
</td>
+</tr>
+<tr>
+<td><span class="keyword
parmname">\-\-enforce-threads-connect-same-receiver</span></td>
+<td>This parameter applies only to serial gateway senders. If true, receiver
member id is checked by all dispatcher threads when the connection is
established to ensure they connect to the same receiver. Instead of starting
all dispatcher threads in parallel, one thread is started first, and after that
the rest are started in parallel.</td>
<td>false</td>
</tr>
</tbody>
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
index a6b40b9..474153b 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
@@ -137,14 +137,20 @@ public class CreateGatewaySenderCommand extends
SingleGfshCommand {
help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER__HELP)
String[] gatewayEventFilters,
@CliOption(key = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER,
- help =
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[]
gatewayTransportFilter) {
+ help =
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[]
gatewayTransportFilter,
+
+ @CliOption(key =
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER,
+ specifiedDefaultValue = "true",
+ unspecifiedDefaultValue = "false",
+ help =
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP)
Boolean enforceThreadsConnectSameReceiver) {
CacheConfig.GatewaySender configuration =
buildConfiguration(id, remoteDistributedSystemId, parallel,
manualStart,
socketBufferSize, socketReadTimeout, enableBatchConflation,
batchSize,
batchTimeInterval, enablePersistence, diskStoreName,
diskSynchronous, maxQueueMemory,
alertThreshold, dispatcherThreads, orderPolicy == null ? null :
orderPolicy.name(),
- gatewayEventFilters, gatewayTransportFilter,
groupTransactionEvents);
+ gatewayEventFilters, gatewayTransportFilter,
groupTransactionEvents,
+ enforceThreadsConnectSameReceiver);
GatewaySenderFunctionArgs gatewaySenderFunctionArgs =
new GatewaySenderFunctionArgs(configuration);
@@ -228,7 +234,8 @@ public class CreateGatewaySenderCommand extends
SingleGfshCommand {
String orderPolicy,
String[] gatewayEventFilters,
String[] gatewayTransportFilters,
- Boolean groupTransactionEvents) {
+ Boolean groupTransactionEvents,
+ Boolean enforceThreadsConnectSameReceiver) {
CacheConfig.GatewaySender sender = new CacheConfig.GatewaySender();
sender.setId(id);
sender.setRemoteDistributedSystemId(int2string(remoteDSId));
@@ -253,7 +260,7 @@ public class CreateGatewaySenderCommand extends
SingleGfshCommand {
if (gatewayTransportFilters != null) {
sender.getGatewayTransportFilters().addAll(stringsToDeclarableTypes(gatewayTransportFilters));
}
-
+
sender.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
return sender;
}
@@ -284,6 +291,10 @@ public class CreateGatewaySenderCommand extends
SingleGfshCommand {
Boolean batchConflationEnabled =
(Boolean) parseResult
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION);
+ Boolean enforceThreadsConnectSameReceiver =
+ (Boolean) parseResult
+ .getParamValue(
+
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy ==
null) {
return ResultModel.createError(
@@ -306,6 +317,14 @@ public class CreateGatewaySenderCommand extends
SingleGfshCommand {
"Gateway Sender cannot be created with both
--group-transaction-events and --enable-batch-conflation.");
}
+ if (parallel && enforceThreadsConnectSameReceiver) {
+ return ResultModel
+ .createError(
+ "Option --" +
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+ + " only applies to serial gateway senders.");
+
+ }
+
return ResultModel.createInfo("");
}
}
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
index 00efaa5..9d7e75f 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
@@ -173,6 +173,13 @@ public class GatewaySenderCreateFunction implements
InternalFunction<GatewaySend
gatewayTransportFilterKlass,
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER));
}
}
+
+ Boolean enforceThreadsConnectSameReceiver =
+ gatewaySenderCreateArgs.getEnforceThreadsConnectSameReceiver();
+ if (enforceThreadsConnectSameReceiver != null) {
+
gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
+ }
+
return gateway.create(gatewaySenderCreateArgs.getId(),
gatewaySenderCreateArgs.getRemoteDistributedSystemId());
}
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
index 2b08ef9..dd70bda 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
@@ -46,6 +46,7 @@ public class GatewaySenderFunctionArgs implements
Serializable {
// array of fully qualified class names of the filters
private final List<String> gatewayEventFilters;
private final List<String> gatewayTransportFilters;
+ private final Boolean enforceThreadsConnectSameReceiver;
public GatewaySenderFunctionArgs(CacheConfig.GatewaySender sender) {
this.id = sender.getId();
@@ -77,6 +78,7 @@ public class GatewaySenderFunctionArgs implements
Serializable {
.stream().map(DeclarableType::getClassName)
.collect(Collectors.toList()))
.orElse(null);
+ this.enforceThreadsConnectSameReceiver =
sender.getEnforceThreadsConnectSameReceiver();
}
private Integer string2int(String x) {
@@ -158,4 +160,8 @@ public class GatewaySenderFunctionArgs implements
Serializable {
public List<String> getGatewayTransportFilter() {
return this.gatewayTransportFilters;
}
+
+ public Boolean getEnforceThreadsConnectSameReceiver() {
+ return this.enforceThreadsConnectSameReceiver;
+ }
}
diff --git
a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
index c60d599..b368b1f 100644
---
a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
+++
b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
@@ -65,7 +65,7 @@
org/apache/geode/management/internal/cli/functions/GatewayReceiverCreateFunction
org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction,true,8746830191680509335
org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunction,true,1
org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunctionArgs,true,3848480256348119530,id:java/lang/String,ifExists:boolean
-org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i
[...]
+org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i
[...]
org/apache/geode/management/internal/cli/functions/GetMemberConfigInformationFunction,true,1
org/apache/geode/management/internal/cli/functions/GetRegionDescriptionFunction,true,1
org/apache/geode/management/internal/cli/functions/GetRegionsFunction,true,1
diff --git
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
index 2784ba5..585158d 100644
---
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
+++
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
@@ -275,7 +275,7 @@ public class CreateGatewaySenderCommandTest {
assertThat(argsArgumentCaptor.getValue().getGatewayEventFilter()).isNotNull().isEmpty();
assertThat(argsArgumentCaptor.getValue().getGatewayTransportFilter()).isNotNull().isEmpty();
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isNotNull();
-
+
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
}
@Test
@@ -347,4 +347,70 @@ public class CreateGatewaySenderCommandTest {
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isFalse();
}
+
+ @Test
+ public void
testEnforceThreadsConnectSameReceiverCannotBeUsedForParallelSenders() {
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1
--parallel --enforce-threads-connect-same-receiver")
+ .statusIsError()
+ .containsOutput(
+ "Option --" +
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+ + " only applies to serial gateway senders.");
+ }
+
+ @Test
+ public void
testEnforceThreadsConnectSameReceiverIsTrueWhenUsedWithoutValue() {
+ doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+ cliFunctionResult =
+ new CliFunctionResult("member", CliFunctionResult.StatusState.OK,
"cliFunctionResult");
+ functionResults.add(cliFunctionResult);
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1
--enforce-threads-connect-same-receiver")
+ .statusIsSuccess();
+ verify(command).executeAndGetFunctionResult(any(),
argsArgumentCaptor.capture(), any());
+
+
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+ }
+
+ @Test
+ public void testEnforceThreadsConnectSameReceiverIsFalseWhenSetToFalse() {
+ doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+ cliFunctionResult =
+ new CliFunctionResult("member", CliFunctionResult.StatusState.OK,
"cliFunctionResult");
+ functionResults.add(cliFunctionResult);
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1
--enforce-threads-connect-same-receiver=false")
+ .statusIsSuccess();
+ verify(command).executeAndGetFunctionResult(any(),
argsArgumentCaptor.capture(), any());
+
+
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+ }
+
+ @Test
+ public void testEnforceThreadsConnectSameReceiverIsTrueWhenSetToTrue() {
+ doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+ cliFunctionResult =
+ new CliFunctionResult("member", CliFunctionResult.StatusState.OK,
"cliFunctionResult");
+ functionResults.add(cliFunctionResult);
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1
--enforce-threads-connect-same-receiver=true")
+ .statusIsSuccess();
+ verify(command).executeAndGetFunctionResult(any(),
argsArgumentCaptor.capture(), any());
+
+
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+ }
+
+ @Test
+ public void testEnforceThreadsConnectSameReceiverIsFalseByDefault() {
+ doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+ cliFunctionResult =
+ new CliFunctionResult("member", CliFunctionResult.StatusState.OK,
"cliFunctionResult");
+ functionResults.add(cliFunctionResult);
+ gfsh.executeAndAssertThat(command,
+ "create gateway-sender --id=1 --remote-distributed-system-id=1")
+ .statusIsSuccess();
+ verify(command).executeAndGetFunctionResult(any(),
argsArgumentCaptor.capture(), any());
+
+
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+ }
}
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
index 734ec8e..854329e 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
@@ -112,7 +112,7 @@ public class CreateDestroyGatewaySenderCommandDUnitTest
implements Serializable
String xml =
locator.getConfigurationPersistenceService().getConfiguration("cluster")
.getCacheXmlContent();
assertThat(xml).contains(
- "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\"
parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\"
enable-persistence=\"false\" disk-synchronous=\"true\"
group-transaction-events=\"false\"/>");
+ "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\"
parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\"
enable-persistence=\"false\" disk-synchronous=\"true\"
group-transaction-events=\"false\"
enforce-threads-connect-same-receiver=\"false\"/>");
});
// destroy gateway sender and verify AEQs cleaned up
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 1199fb9..4b7e330 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -18,8 +18,10 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.Vector;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.regex.Pattern;
import org.apache.logging.log4j.Logger;
@@ -63,6 +65,9 @@ public class GatewaySenderEventRemoteDispatcher implements
GatewaySenderEventDis
private ReentrantReadWriteLock connectionLifeCycleLock = new
ReentrantReadWriteLock();
+ protected static final String
maxAttemptsReachedConnectingServerIdExceptionMessage =
+ "Reached max attempts number trying to connect to desired server id";
+
/*
* Called after each attempt at processing an outbound (dispatch) or inbound
(ack)
* message, whether the attempt is successful or not. The purpose is
testability.
@@ -86,7 +91,6 @@ public class GatewaySenderEventRemoteDispatcher implements
GatewaySenderEventDis
public
GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor
eventProcessor) {
this.processor = eventProcessor;
this.sender = eventProcessor.getSender();
- // this.ackReaderThread = new AckReaderThread(sender);
try {
initializeConnection();
} catch (GatewaySenderException e) {
@@ -362,11 +366,70 @@ public class GatewaySenderEventRemoteDispatcher
implements GatewaySenderEventDis
}
}
+ Connection retryInitializeConnection(Connection con) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+ String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+ if (expectedServerId.equals("")) {
+ if (isDebugEnabled) {
+ logger.debug("First dispatcher connected to server " +
connectedServerId);
+ }
+ this.processor.setExpectedReceiverUniqueId(connectedServerId);
+ return con;
+ }
+
+ int attempt = 0;
+ final int attemptsPerServer = 5;
+ int maxAttempts = attemptsPerServer;
+ Vector<String> notExpectedServerIds = new Vector<String>();
+ boolean connectedToExpectedReceiver =
connectedServerId.equals(expectedServerId);
+ while (!connectedToExpectedReceiver) {
+
+ if (isDebugEnabled) {
+ logger.debug("Dispatcher wants to connect to [" + expectedServerId
+ + "] but got connection to [" + connectedServerId + "]");
+ }
+ attempt++;
+ if (!notExpectedServerIds.contains(connectedServerId)) {
+ if (isDebugEnabled) {
+ logger.debug(
+ "Increasing dispatcher connection max retries number due to
connection to unknown server ("
+ + connectedServerId + ")");
+ }
+ notExpectedServerIds.add(connectedServerId);
+ maxAttempts += attemptsPerServer;
+ }
+
+ if (attempt >= maxAttempts) {
+ throw new
ServerConnectivityException(maxAttemptsReachedConnectingServerIdExceptionMessage
+ + " [" + expectedServerId + "] (" + maxAttempts + " attempts).");
+ }
+
+ con.destroy();
+ this.sender.getProxy().returnConnection(con);
+ con = this.sender.getProxy().acquireConnection();
+
+ connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+ if (connectedServerId.equals(expectedServerId)) {
+ connectedToExpectedReceiver = true;
+ }
+ }
+
+ if (isDebugEnabled) {
+ logger.debug("Dispatcher connected to expected endpoint " +
connectedServerId
+ + " after " + attempt + " retries.");
+ }
+ return con;
+ }
+
/**
* Initializes the <code>Connection</code>.
*
*/
+ @VisibleForTesting
void initializeConnection() throws GatewaySenderException,
GemFireSecurityException {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
if (ackReaderThread != null) {
ackReaderThread.shutDownAckReaderConnection(connection);
}
@@ -397,26 +460,24 @@ public class GatewaySenderEventRemoteDispatcher
implements GatewaySenderEventDis
synchronized (this.sender.getLockForConcurrentDispatcher()) {
ServerLocation server = this.sender.getServerLocation();
if (server != null) {
- if (logger.isDebugEnabled()) {
+ if (isDebugEnabled) {
logger.debug("ServerLocation is: {}. Connecting to this
serverLocation...", server);
}
con = this.sender.getProxy().acquireConnection(server);
} else {
- if (logger.isDebugEnabled()) {
+ if (isDebugEnabled) {
logger.debug("ServerLocation is null. Creating new connection.
");
}
con = this.sender.getProxy().acquireConnection();
- // Acquired connection from pool!! Update the server location
- // information in the sender and
- // distribute the information to other senders ONLY IF THIS
SENDER
- // IS
- // PRIMARY
- if (this.sender.isPrimary()) {
- if (sender.getServerLocation() == null) {
- sender.setServerLocation(con.getServer());
- }
- new UpdateAttributesProcessor(this.sender).distribute(false);
+ }
+ if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+ con = retryInitializeConnection(con);
+ }
+ if (this.sender.isPrimary()) {
+ if (sender.getServerLocation() == null) {
+ sender.setServerLocation(con.getServer());
}
+ new UpdateAttributesProcessor(this.sender).distribute(false);
}
}
}
@@ -486,6 +547,12 @@ public class GatewaySenderEventRemoteDispatcher implements
GatewaySenderEventDis
"No available connection was found, but the following active
servers exist: %s",
buffer.toString());
}
+ if (this.sender.getEnforceThreadsConnectSameReceiver() && e.getMessage()
!= null) {
+ if
(Pattern.compile(maxAttemptsReachedConnectingServerIdExceptionMessage + ".*")
+ .matcher(e.getMessage()).find()) {
+ ioMsg += " " + e.getMessage();
+ }
+ }
IOException ex = new IOException(ioMsg);
gse = new GatewaySenderException(
String.format("%s : Could not connect due to: %s",
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
index 2a7cfd7..c0d2051 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -199,6 +199,13 @@ public class GatewaySenderFactoryImpl implements
InternalGatewaySenderFactory {
}
@Override
+ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+ boolean enforceThreadsConnectSameReceiver) {
+ this.attrs.enforceThreadsConnectSameReceiver =
enforceThreadsConnectSameReceiver;
+ return this;
+ }
+
+ @Override
public GatewaySender create(String id, int remoteDSId) {
int myDSId =
InternalDistributedSystem.getAnyInstance().getDistributionManager()
.getDistributedSystemId();
@@ -291,7 +298,6 @@ public class GatewaySenderFactoryImpl implements
InternalGatewaySenderFactory {
if (this.cache instanceof GemFireCacheImpl) {
sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
this.cache.addGatewaySender(sender);
-
if (!this.attrs.isManualStart()) {
sender.start();
}
@@ -394,5 +400,7 @@ public class GatewaySenderFactoryImpl implements
InternalGatewaySenderFactory {
}
this.attrs.eventSubstitutionFilter =
senderCreation.getGatewayEventSubstitutionFilter();
this.attrs.groupTransactionEvents =
senderCreation.mustGroupTransactionEvents();
+ this.attrs.enforceThreadsConnectSameReceiver =
+ senderCreation.getEnforceThreadsConnectSameReceiver();
}
}
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
index f18ce81..ef3e599 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -35,7 +35,7 @@ public class RemoteSerialGatewaySenderEventProcessor extends
SerialGatewaySender
@Override
public void initializeEventDispatcher() {
if (logger.isDebugEnabled()) {
- logger.debug(" Creating the GatewayEventRemoteDispatcher");
+ logger.debug("Creating the GatewayEventRemoteDispatcher");
}
// In case of serial there is a way to create gatewaySender and attach
// asyncEventListener. Not sure of the use-case but there are dunit tests
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index 3474b4a..97436a2 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -228,6 +228,7 @@ public class SerialGatewaySenderImpl extends
AbstractRemoteGatewaySender {
pf.dispatcherThreads = getDispatcherThreads();
pf.orderPolicy = getOrderPolicy();
pf.serverLocation = this.getServerLocation();
+ pf.enforceThreadsConnectSameReceiver =
getEnforceThreadsConnectSameReceiver();
}
@Override
diff --git
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
index 7cfe1f5..8b35ab1 100644
---
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
+++
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -14,9 +14,11 @@
*/
package org.apache.geode.internal.cache.wan;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -24,11 +26,64 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
public class GatewaySenderEventRemoteDispatcherJUnitTest {
+
+ @Mock
+ private AbstractGatewaySender senderMock;
+
+ @Mock
+ private AbstractGatewaySenderEventProcessor eventProcessorMock;
+
+ @InjectMocks
+ private GatewaySenderEventRemoteDispatcher eventDispatcher;
+
+ @Mock
+ private PoolImpl poolMock;
+
+ @Mock
+ private Connection connectionMock;
+
+ @Mock
+ private ServerQueueStatus serverQueueStatusMock;
+
+ @Mock
+ private Endpoint endpointMock;
+
+ @Mock
+ private DistributedMember memberIdMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(eventProcessorMock.getSender()).thenReturn(senderMock);
+
+ when(senderMock.isParallel()).thenReturn(false);
+ when(senderMock.getLockForConcurrentDispatcher()).thenReturn(new Object());
+ when(senderMock.getProxy()).thenReturn(poolMock);
+
+ when(poolMock.isDestroyed()).thenReturn(false);
+ when(poolMock.acquireConnection()).thenReturn(connectionMock);
+
+ when(connectionMock.getQueueStatus()).thenReturn(serverQueueStatusMock);
+ }
+
@Test
public void
getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
@@ -46,7 +101,7 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest {
}
@Test
- public void
shuttingDownAckThreadReaderConnectionShouldshutdownTheAckThreadReader() {
+ public void
shuttingDownAckThreadReaderConnectionShouldShutdownTheAckThreadReader() {
AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
AbstractGatewaySenderEventProcessor eventProcessor =
mock(AbstractGatewaySenderEventProcessor.class);
@@ -77,4 +132,161 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest {
verify(dispatcher, times(1)).initializeConnection();
verify(dispatcher, times(2)).getConnectionLifeCycleLock();
}
+
+ @Test
+ public void
initializeConnectionWithParallelSenderDoesNotRetryInitializeConnection() {
+ when(senderMock.isParallel()).thenReturn(true);
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+ dispatcherSpy.initializeConnection();
+
+ verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(1)).setServerLocation(any());
+ verify(poolMock, times(1)).acquireConnection();
+ verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameRecieverFalseDoesNotRetryInitializeConnection()
{
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+ when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+ dispatcherSpy.initializeConnection();
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+ verify(poolMock, times(1)).acquireConnection();
+ verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndNoExpectedReceiverIdSetsReceiverIdAndDoesNotReacquireConnection()
{
+
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+ when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+ when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+ dispatcherSpy.initializeConnection();
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+ verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+ verify(poolMock, times(1)).acquireConnection();
+ verify(eventProcessorMock,
times(1)).setExpectedReceiverUniqueId("receiverId");
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverDoesNotReacquireConnection()
{
+
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+ when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+ dispatcherSpy.initializeConnection();
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+ verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+ verify(poolMock, times(1)).acquireConnection();
+ verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverOnSecondTryReacquiresConnectionOnce()
{
+
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+
when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId");
+
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+ dispatcherSpy.initializeConnection();
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+ verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+ verify(poolMock, times(2)).acquireConnection();
+ verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndNoServersAvailableThrowsException()
{
+
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+ when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+ String expectedExceptionMessage =
+ "There are no active servers. "
+ +
GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+ + " [expectedId] (10 attempts)";
+ assertThatThrownBy(() -> {
+ dispatcherSpy.initializeConnection();
+
}).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+ verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+ verify(poolMock, times(10)).acquireConnection();
+ verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+ }
+
+ @Test
+ public void
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndServersAvailableThrowsException()
{
+
+ when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+ when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+ when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+ when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+ List<ServerLocationAndMemberId> currentServers = new ArrayList<>();
+ currentServers.add(new ServerLocationAndMemberId(new
ServerLocation("host1", 1), "id1"));
+ currentServers.add(new ServerLocationAndMemberId(new
ServerLocation("host2", 2), "id2"));
+ when(poolMock.getCurrentServers()).thenReturn(currentServers);
+
+ eventDispatcher = new
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+ GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+ String expectedExceptionMessage =
+ "No available connection was found, but the following active servers
exist: host1:1@id1, host2:2@id2 "
+ +
GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+ + " [expectedId] (10 attempts)";
+ assertThatThrownBy(() -> {
+ dispatcherSpy.initializeConnection();
+
}).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+ verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+ verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+ verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+ verify(poolMock, times(10)).acquireConnection();
+ verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+ }
+
}