Repository: geode
Updated Branches:
  refs/heads/develop ac0474d8b -> a1583458e


GEODE-3147: Set commBuffer for threads performing TXSynchronization calls when 
max-threads is set.

        Added an interface that is implemented by AcceptorImpl to set and 
release commBuffer.

        Added a unit test case that failed without this fix.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a1583458
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a1583458
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a1583458

Branch: refs/heads/develop
Commit: a1583458e81b0a3fb3766948dd5b03af9fabf907
Parents: ac0474d
Author: eshu <e...@pivotal.io>
Authored: Thu Jun 29 17:03:44 2017 -0700
Committer: eshu <e...@pivotal.io>
Committed: Thu Jun 29 17:03:44 2017 -0700

----------------------------------------------------------------------
 .../cache/TXSynchronizationRunnable.java        | 15 +++++-
 .../cache/tier/sockets/AcceptorImpl.java        | 25 ++++++++--
 .../cache/tier/sockets/CommBufferPool.java      | 36 +++++++++++++++
 .../cache/tier/sockets/ServerConnection.java    |  4 +-
 .../command/TXSynchronizationCommand.java       |  3 +-
 .../internal/jta/ClientServerJTADUnitTest.java  | 48 ++++++++++++++++++--
 6 files changed, 118 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
index 35b0e75..884f281 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
@@ -15,7 +15,7 @@
 package org.apache.geode.internal.cache;
 
 import org.apache.logging.log4j.Logger;
-
+import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -38,12 +38,23 @@ public class TXSynchronizationRunnable implements Runnable {
   private boolean secondRunnableCompleted;
 
   private boolean abort;
+  private final CommBufferPool commBufferPool;
 
-  public TXSynchronizationRunnable(Runnable beforeCompletion) {
+  public TXSynchronizationRunnable(Runnable beforeCompletion, final 
CommBufferPool commBufferPool) {
     this.firstRunnable = beforeCompletion;
+    this.commBufferPool = commBufferPool;
   }
 
   public void run() {
+    commBufferPool.setTLCommBuffer();
+    try {
+      doSynchronizationOps();
+    } finally {
+      commBufferPool.releaseTLCommBuffer();
+    }
+  }
+
+  private void doSynchronizationOps() {
     synchronized (this.firstRunnableSync) {
       try {
         this.firstRunnable.run();

http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 472af09..3c424d3 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -98,7 +98,7 @@ import javax.net.ssl.SSLException;
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable {
+public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool 
{
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = 
System.getProperty("java.vm.name").contains("JRockit");
@@ -1373,7 +1373,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
     }
   }
 
-  public ByteBuffer takeCommBuffer() {
+  private ByteBuffer takeCommBuffer() {
     ByteBuffer result = (ByteBuffer) this.commBufferQueue.poll();
     if (result == null) {
       result = ByteBuffer.allocateDirect(this.socketBufferSize);
@@ -1381,7 +1381,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
     return result;
   }
 
-  public void releaseCommBuffer(ByteBuffer bb) {
+  private void releaseCommBuffer(ByteBuffer bb) {
     if (bb == null) { // fix for bug 37107
       return;
     }
@@ -1791,4 +1791,23 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
   }
+
+  @Override
+  public void setTLCommBuffer() {
+    // The thread local will only be set if maxThreads has been set.
+    if (!isSelector()) {
+      return;
+    }
+
+    Message.setTLCommBuffer(takeCommBuffer());
+  }
+
+  @Override
+  public void releaseTLCommBuffer() {
+    if (!isSelector()) {
+      return;
+    }
+
+    releaseCommBuffer(Message.setTLCommBuffer(null));
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
new file mode 100644
index 0000000..de3189e
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CommBufferPool.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+/**
+ * Defines the comm buffer pool interface which can set a comm buffer in a 
ThreadLocal and return
+ * the buffer back to the comm buffer queue.
+ *
+ */
+public interface CommBufferPool {
+
+  /**
+   * Set a comm buffer in a ThreadLocal.
+   * 
+   */
+  public void setTLCommBuffer();
+
+  /**
+   * Release the ThreadLocal comm buffer back to the queue.
+   * 
+   */
+  public void releaseTLCommBuffer();
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ebc9dab..8704dad 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1118,7 +1118,7 @@ public abstract class ServerConnection implements 
Runnable {
       try {
         this.stats.decThreadQueueSize();
         if (!isTerminated()) {
-          Message.setTLCommBuffer(getAcceptor().takeCommBuffer());
+          getAcceptor().setTLCommBuffer();
           doOneMessage();
           if (this.processMessages && !(this.crHelper.isShutdown())) {
             registerWithSelector(); // finished msg so reregister
@@ -1134,7 +1134,7 @@ public abstract class ServerConnection implements 
Runnable {
             
LocalizedMessage.create(LocalizedStrings.ServerConnection_0__UNEXPECTED_EXCEPTION,
 ex));
         setClientDisconnectedException(ex);
       } finally {
-        getAcceptor().releaseCommBuffer(Message.setTLCommBuffer(null));
+        getAcceptor().releaseTLCommBuffer();
         // DistributedSystem.releaseThreadsSockets();
         unsetOwner();
         setNotProcessingMessage();

http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index b1b0cfb..eb70700 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -142,7 +142,8 @@ public class TXSynchronizationCommand extends BaseCommand {
               }
             }
           };
-          TXSynchronizationRunnable sync = new 
TXSynchronizationRunnable(beforeCompletion);
+          TXSynchronizationRunnable sync =
+              new TXSynchronizationRunnable(beforeCompletion, 
serverConnection.getAcceptor());
           txProxy.setSynchronizationRunnable(sync);
           Executor exec = 
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
               .getWaitingThreadPool();

http://git-wip-us.apache.org/repos/asf/geode/blob/a1583458/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 51ef44a..ddf08d0 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -49,19 +49,19 @@ public class ClientServerJTADUnitTest extends 
JUnit4CacheTestCase {
   private String key = "key";
   private String value = "value";
   private String newValue = "newValue";
+  final Host host = Host.getHost(0);
+  final VM server = host.getVM(0);
+  final VM client = host.getVM(1);
 
   @Test
   public void testClientTXStateStubBeforeCompletion() throws Exception {
-    final Host host = Host.getHost(0);
-    final VM server = host.getVM(0);
-    final VM client = host.getVM(1);
     final String regionName = getUniqueName();
     getBlackboard().initBlackboard();
     final Properties properties = getDistributedSystemProperties();
 
     final int port = server.invoke("create cache", () -> {
       Cache cache = getCache(properties);
-      CacheServer cacheServer = createCacheServer(cache);
+      CacheServer cacheServer = createCacheServer(cache, 0);
       Region region = 
cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
       region.put(key, value);
 
@@ -106,8 +106,9 @@ public class ClientServerJTADUnitTest extends 
JUnit4CacheTestCase {
     }
   }
 
-  private CacheServer createCacheServer(Cache cache) {
+  private CacheServer createCacheServer(Cache cache, int maxThreads) {
     CacheServer server = cache.addCacheServer();
+    server.setMaxThreads(maxThreads);
     server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
     try {
       server.start();
@@ -140,4 +141,41 @@ public class ClientServerJTADUnitTest extends 
JUnit4CacheTestCase {
     }
     txStub.afterCompletion(Status.STATUS_COMMITTED);
   }
+
+  @Test
+  public void testJTAMaxThreads() throws TimeoutException, 
InterruptedException {
+    testJTAWithMaxThreads(1);
+  }
+
+  @Test
+  public void testJTANoMaxThreadsSetting() throws TimeoutException, 
InterruptedException {
+    testJTAWithMaxThreads(0);
+  }
+
+  private void testJTAWithMaxThreads(int maxThreads) {
+    final String regionName = getUniqueName();
+    getBlackboard().initBlackboard();
+    final Properties properties = getDistributedSystemProperties();
+
+    final int port = server.invoke("create cache", () -> {
+      Cache cache = getCache(properties);
+      CacheServer cacheServer = createCacheServer(cache, maxThreads);
+      Region region = 
cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+      region.put(key, value);
+
+      return cacheServer.getPort();
+    });
+
+    createClientRegion(host, port, regionName);
+
+    Region region = getCache().getRegion(regionName);
+    assertTrue(region.get(key).equals(value));
+
+    try {
+      commitTxWithBeforeCompletion(regionName, false, null, null);
+    } catch (Exception e) {
+      Assert.fail("got unexpected exception", e);
+    }
+    assertTrue(region.get(key).equals(newValue));
+  }
 }

Reply via email to