GEODE-3147: Set commBuffer for threads performing TXSynchronization calls when
max-threads is set.
Added an interface that is implemented by AcceptorImpl to set and
release commBuffer.
Added a unit test case that failed without this fix.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a1583458
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a1583458
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a1583458
Branch: refs/heads/feature/GEODE-3109
Commit: a1583458e81b0a3fb3766948dd5b03af9fabf907
Parents: ac0474d
Author: eshu <[email protected]>
Authored: Thu Jun 29 17:03:44 2017 -0700
Committer: eshu <[email protected]>
Committed: Thu Jun 29 17:03:44 2017 -0700
----------------------------------------------------------------------
.../cache/TXSynchronizationRunnable.java | 15 +++++-
.../cache/tier/sockets/AcceptorImpl.java | 25 ++++++++--
.../cache/tier/sockets/CommBufferPool.java | 36 +++++++++++++++
.../cache/tier/sockets/ServerConnection.java | 4 +-
.../command/TXSynchronizationCommand.java | 3 +-
.../internal/jta/ClientServerJTADUnitTest.java | 48 ++++++++++++++++++--
6 files changed, 118 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
index 35b0e75..884f281 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
@@ -15,7 +15,7 @@
package org.apache.geode.internal.cache;
import org.apache.logging.log4j.Logger;
-
+import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
import org.apache.geode.internal.logging.LogService;
/**
@@ -38,12 +38,23 @@ public class TXSynchronizationRunnable implements Runnable {
private boolean secondRunnableCompleted;
private boolean abort;
+ private final CommBufferPool commBufferPool;
- public TXSynchronizationRunnable(Runnable beforeCompletion) {
+ public TXSynchronizationRunnable(Runnable beforeCompletion, final
CommBufferPool commBufferPool) {
this.firstRunnable = beforeCompletion;
+ this.commBufferPool = commBufferPool;
}
public void run() {
+ commBufferPool.setTLCommBuffer();
+ try {
+ doSynchronizationOps();
+ } finally {
+ commBufferPool.releaseTLCommBuffer();
+ }
+ }
+
+ private void doSynchronizationOps() {
synchronized (this.firstRunnableSync) {
try {
this.firstRunnable.run();
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 472af09..3c424d3 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -98,7 +98,7 @@ import javax.net.ssl.SSLException;
* @since GemFire 2.0.2
*/
@SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable {
+public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool
{
private static final Logger logger = LogService.getLogger();
private static final boolean isJRockit =
System.getProperty("java.vm.name").contains("JRockit");
@@ -1373,7 +1373,7 @@ public class AcceptorImpl extends Acceptor implements
Runnable {
}
}
- public ByteBuffer takeCommBuffer() {
+ private ByteBuffer takeCommBuffer() {
ByteBuffer result = (ByteBuffer) this.commBufferQueue.poll();
if (result == null) {
result = ByteBuffer.allocateDirect(this.socketBufferSize);
@@ -1381,7 +1381,7 @@ public class AcceptorImpl extends Acceptor implements
Runnable {
return result;
}
- public void releaseCommBuffer(ByteBuffer bb) {
+ private void releaseCommBuffer(ByteBuffer bb) {
if (bb == null) { // fix for bug 37107
return;
}
@@ -1791,4 +1791,23 @@ public class AcceptorImpl extends Acceptor implements
Runnable {
public ServerConnection[] getAllServerConnectionList() {
return this.allSCList;
}
+
+ @Override
+ public void setTLCommBuffer() {
+ // The thread local will only be set if maxThreads has been set.
+ if (!isSelector()) {
+ return;
+ }
+
+ Message.setTLCommBuffer(takeCommBuffer());
+ }
+
+ @Override
+ public void releaseTLCommBuffer() {
+ if (!isSelector()) {
+ return;
+ }
+
+ releaseCommBuffer(Message.setTLCommBuffer(null));
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
new file mode 100644
index 0000000..de3189e
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+/**
+ * Defines the comm buffer pool interface which can set a comm buffer in a
ThreadLocal and return
+ * the buffer back to the comm buffer queue.
+ *
+ */
+public interface CommBufferPool {
+
+ /**
+ * Set a comm buffer in a ThreadLocal.
+ *
+ */
+ public void setTLCommBuffer();
+
+ /**
+ * Release the ThreadLocal comm buffer back to the queue.
+ *
+ */
+ public void releaseTLCommBuffer();
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ebc9dab..8704dad 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1118,7 +1118,7 @@ public abstract class ServerConnection implements
Runnable {
try {
this.stats.decThreadQueueSize();
if (!isTerminated()) {
- Message.setTLCommBuffer(getAcceptor().takeCommBuffer());
+ getAcceptor().setTLCommBuffer();
doOneMessage();
if (this.processMessages && !(this.crHelper.isShutdown())) {
registerWithSelector(); // finished msg so reregister
@@ -1134,7 +1134,7 @@ public abstract class ServerConnection implements
Runnable {
LocalizedMessage.create(LocalizedStrings.ServerConnection_0__UNEXPECTED_EXCEPTION,
ex));
setClientDisconnectedException(ex);
} finally {
- getAcceptor().releaseCommBuffer(Message.setTLCommBuffer(null));
+ getAcceptor().releaseTLCommBuffer();
// DistributedSystem.releaseThreadsSockets();
unsetOwner();
setNotProcessingMessage();
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index b1b0cfb..eb70700 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -142,7 +142,8 @@ public class TXSynchronizationCommand extends BaseCommand {
}
}
};
- TXSynchronizationRunnable sync = new
TXSynchronizationRunnable(beforeCompletion);
+ TXSynchronizationRunnable sync =
+ new TXSynchronizationRunnable(beforeCompletion,
serverConnection.getAcceptor());
txProxy.setSynchronizationRunnable(sync);
Executor exec =
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
.getWaitingThreadPool();
http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 51ef44a..ddf08d0 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -49,19 +49,19 @@ public class ClientServerJTADUnitTest extends
JUnit4CacheTestCase {
private String key = "key";
private String value = "value";
private String newValue = "newValue";
+ final Host host = Host.getHost(0);
+ final VM server = host.getVM(0);
+ final VM client = host.getVM(1);
@Test
public void testClientTXStateStubBeforeCompletion() throws Exception {
- final Host host = Host.getHost(0);
- final VM server = host.getVM(0);
- final VM client = host.getVM(1);
final String regionName = getUniqueName();
getBlackboard().initBlackboard();
final Properties properties = getDistributedSystemProperties();
final int port = server.invoke("create cache", () -> {
Cache cache = getCache(properties);
- CacheServer cacheServer = createCacheServer(cache);
+ CacheServer cacheServer = createCacheServer(cache, 0);
Region region =
cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
region.put(key, value);
@@ -106,8 +106,9 @@ public class ClientServerJTADUnitTest extends
JUnit4CacheTestCase {
}
}
- private CacheServer createCacheServer(Cache cache) {
+ private CacheServer createCacheServer(Cache cache, int maxThreads) {
CacheServer server = cache.addCacheServer();
+ server.setMaxThreads(maxThreads);
server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
try {
server.start();
@@ -140,4 +141,41 @@ public class ClientServerJTADUnitTest extends
JUnit4CacheTestCase {
}
txStub.afterCompletion(Status.STATUS_COMMITTED);
}
+
+ @Test
+ public void testJTAMaxThreads() throws TimeoutException,
InterruptedException {
+ testJTAWithMaxThreads(1);
+ }
+
+ @Test
+ public void testJTANoMaxThreadsSetting() throws TimeoutException,
InterruptedException {
+ testJTAWithMaxThreads(0);
+ }
+
+ private void testJTAWithMaxThreads(int maxThreads) {
+ final String regionName = getUniqueName();
+ getBlackboard().initBlackboard();
+ final Properties properties = getDistributedSystemProperties();
+
+ final int port = server.invoke("create cache", () -> {
+ Cache cache = getCache(properties);
+ CacheServer cacheServer = createCacheServer(cache, maxThreads);
+ Region region =
cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ region.put(key, value);
+
+ return cacheServer.getPort();
+ });
+
+ createClientRegion(host, port, regionName);
+
+ Region region = getCache().getRegion(regionName);
+ assertTrue(region.get(key).equals(value));
+
+ try {
+ commitTxWithBeforeCompletion(regionName, false, null, null);
+ } catch (Exception e) {
+ Assert.fail("got unexpected exception", e);
+ }
+ assertTrue(region.get(key).equals(newValue));
+ }
}