[ 
https://issues.apache.org/jira/browse/GEODE-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juan Ramos updated GEODE-9538:
------------------------------
    Description: 
I've hit this issue while executing some chaos testing over a GemFire cluster 
using 2 locators and 3 servers; {{SSL}} is enabled and a dummy 
{{SecurityManager}} is configured to authenticate and authorize a 
pre-configured set of well known users.
 There are 3 {{PARTITION_REDUNDANT}} regions configured, one per client, each 
with 1 redundant copy. Once the cluster is up and running, 3 clients 
continuously execute {{Region.get}} and {{Region.put}} operations on a known 
set of keys for its own {{Region}} (created with {{PROXY}} type), and another 
process executes the following logic in parallel (pseudocode):
{noformat}
for server in ${servers}
do
        # Pause the JVM for 30 seconds to simulate a stop the world GC
        kill -STOP server 
        sleep 30

        # Unpause the JVM, wait for member to reconnect and regions to recover 
redundancy configured
        kill -CONT "${SERVER_PID}"
        waitForReconnectcompletedInServerLog
        waitForNumBucketsWithoutRedundancyToBeZeroInGfshShowRegionMetrics
done
{noformat}
The test works fine most of the time, but randomly fails due to an unexpected 
exception logged within the logs of at least one server. The exception is 
always reported from a {{ServerConnection}} thread on the server member that 
has just returned to life, as an example:
{noformat}
[info 2021/08/09 11:01:07.430 GMT system-test-gemfire-server-2 <Pooled Waiting 
Message Processor 11> tid=0x8d] Configured redundancy of 2 copies has been 
restored to /system-test-client-7f6795dfb8-v7hh8-region

[warn 2021/08/09 11:02:19.742 GMT system-test-gemfire-server-2 
<ClientHealthMonitor Thread> tid=0x4d] Server connection from 
[identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1;
 port=50264] is being terminated because its client timeout of 10000 has 
expired.

[warn 2021/08/09 11:02:19.744 GMT system-test-gemfire-server-2 
<ClientHealthMonitor Thread> tid=0x4d] ClientHealthMonitor: Unregistering 
client with member id 
identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1
 due to: Unknown reason

[info 2021/08/09 11:02:19.745 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] received suspect message 
from 
system-test-gemfire-locator-0(system-test-gemfire-locator-0:1:locator)<ec><v1>:41000
 for system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000: 
Member isn't responding to heartbeat requests

[info 2021/08/09 11:02:19.747 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] Membership received a 
request to remove 
system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000 from 
system-test-gemfire-locator-1(system-test-gemfire-locator-1:1:locator)<ec><v0>:41000
 reason=Member isn't responding to heartbeat requests

[warn 2021/08/09 11:02:19.748 GMT system-test-gemfire-server-2 <StatSampler> 
tid=0x38] Statistics sampling thread detected a wakeup delay of 29965 ms, 
indicating a possible resource issue. Check the GC, memory, and CPU statistics.

...

[warn 2021/08/09 11:02:19.854 GMT system-test-gemfire-server-2 
<ServerConnection on port 40404 Thread 9> tid=0x91] ClientHealthMonitor: 
Unregistering client with member id 
identity(system-test-client-7f6795dfb8-v7hh8(SpringBasedClientCacheApplication:1:loner):44012:ec3c8d2a:SpringBasedClientCacheApplication,connection=1
 due to: The connection has been reset while reading the header

[info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] saving cache server 
configuration for use with the cluster-configuration service on reconnect

[info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] cache server 
configuration saved

[fatal 2021/08/09 11:02:19.876 GMT system-test-gemfire-server-2 
<ServerConnection on port 40404 Thread 14> tid=0xa9] Uncaught exception in 
thread Thread[ServerConnection on port 40404 Thread 14,5,main]
java.lang.NullPointerException
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMessage(ServerConnection.java:865)
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:1022)
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1275)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.lambda$initializeServerConnectionThreadPool$3(AcceptorImpl.java:690)
        at 
org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:120)
        at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}
The problem itself is really hard to reproduce, we only hit it twice in around 
200 runs. Logs and statistics can be found 
[here|https://drive.google.com/file/d/1yHrEQYE2i2Hf6WJY2Dwyj2GqVoCJUoWf/view?usp=sharing]
 Even though the logs don't show much information, only what I've included 
above, I believe there's a race condition between the {{handleTermination()}} 
and {{doNormalMessage()}} methods within the {{ServerConnection}} class. One of 
the latest tasks within {{handleTermination}} is to set the {{clientUserAuths}} 
attribute as {{null}}, and I haven't found any synchronization around this.
{noformat}
void handleTermination(boolean timedOut) {
...
    if (unregisterClient) {
      // last serverconnection call all close on auth objects
      cleanClientAuths();
    }
    clientUserAuths = null;
    if (needsUnregister) {
      acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
      if (unregisterClient) {
        acceptor.getClientHealthMonitor().unregisterClient(proxyId, 
getAcceptor(),
            clientDisconnectedCleanly, clientDisconnectedException);
      }
    }
...
}
{noformat}
I might be wrong here, but I think that if {{handleTermination}} is invoked 
interleaved with {{doNormalMessage}}, specifically _*after*_ we check for the 
member shutting down but _*before*_ we get the actual subject, the 
{{clientUserAuths}} attribute might have been set as {{null}} already by 
{{handleTermination}} and, as such, the exception is thrown.

—

Below is small test that can be used to easily reproduce the issue:
{noformat}
package org.apache.geode.internal.cache.tier.sockets;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.LENIENT;

import java.net.InetAddress;
import java.net.Socket;
import java.util.Random;

import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.test.junit.categories.ClientServerTest;

@Category(ClientServerTest.class)
public class ServerConnectionNullPointerExceptionIntegrationTest {
  @Rule
  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);

  @Test
  public void 
nullPointerExceptionNotThrownWhenHandleTerminationIsInvokedInterleavedWithDoOneMessage()
 {
    InetAddress inetAddress = mock(InetAddress.class);
    AcceptorImpl acceptor = mock(AcceptorImpl.class);
    Socket socket = mock(Socket.class);
    InternalCache cache = mock(InternalCache.class);
    CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
    SecurityService securityService = mock(SecurityService.class);
    when(securityService.isIntegratedSecurity()).thenReturn(true);
    CacheServerStats stats = mock(CacheServerStats.class);

    when(inetAddress.getHostAddress()).thenReturn("localhost");
    when(socket.getInetAddress()).thenReturn(inetAddress);

    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
    DistributionManager distributionManager = mock(DistributionManager.class);
    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);

    when(cachedRegionHelper.getCache()).thenReturn(cache);
    
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
    
when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);

    
when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));

    ServerConnectionCollection mockCollection = 
spy(ServerConnectionCollection.class);
    when(mockCollection.incrementConnectionsProcessing()).thenReturn(true);

    ClientHealthMonitor mockClientHealthMonitor = 
mock(ClientHealthMonitor.class);
    when(mockClientHealthMonitor.addConnection(any(), 
any())).thenReturn(mockCollection);
    when(acceptor.getClientHealthMonitor()).thenReturn(mockClientHealthMonitor);
    
when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));

    TestServerConnection testServerConnection = new 
TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null, 
CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, 
securityService);

    assertThatCode(testServerConnection::run).doesNotThrowAnyException();
  }

  private static class TestMessage extends Message {
    TestMessage() {
      super(3, KnownVersion.CURRENT);
      messageType = MessageType.REQUEST;
      securePart = new Part();
    }

    @Override
    public void receive() {
    }
  }

  private static class TestServerConnection extends ServerConnection {
    TestServerConnection(Socket socket, InternalCache internalCache,
        CachedRegionHelper cachedRegionHelper, CacheServerStats stats, int 
hsTimeout,
        int socketBufferSize, String communicationModeStr, byte 
communicationMode,
        Acceptor acceptor, SecurityService securityService) {
      super(socket, internalCache, cachedRegionHelper, stats, hsTimeout, 
socketBufferSize,
          communicationModeStr, communicationMode, acceptor, securityService);
      setClientDisconnectCleanly();
    }

    @Override
    protected void doHandshake() {
      ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
      ServerSideHandshake handshake = mock(ServerSideHandshake.class);
      MessageIdExtractor extractor = mock(MessageIdExtractor.class);
      when(handshake.getVersion()).thenReturn(KnownVersion.CURRENT);
      
when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
      setHandshake(handshake);
      setProxyId(proxyID);
      processHandShake();
      setFakeRequest();

      super.doHandshake();
    }

    @Override
    // Naive approach to simulate that handleTermination was invoked in between
    public long getUniqueId() {
      handleTermination();
      return new Random().nextLong();
    }

    private void setFakeRequest() {
      TestMessage testMessage = new TestMessage();
      setRequestMessage(testMessage);
    }
  }
}
{noformat}

  was:
I've hit this issue while executing some chaos testing over a GemFire cluster 
using 2 locators and 3 servers; {{SSL}} is enabled and a dummy 
{{SecurityManager}} is configured to authenticate and authorize a 
pre-configured set of well known users.
 There are 3 {{PARTITION_REDUNDANT}} regions configured, one per client, each 
with 1 redundant copy. Once the cluster is up and running, 3 clients 
continuously execute {{Region.get}} and {{Region.put}} operations on a known 
set of keys for its own {{Region}} (created with {{PROXY}} type), and another 
process executes the following logic in parallel (pseudocode):
{noformat}
for server in ${servers}
do
        # Pause the JVM for 30 seconds to simulate a stop the world GC
        kill -STOP server 
        sleep 30

        # Unpause the JVM, wait for member to reconnect and regions to recover 
redundancy configured
        kill -CONT "${SERVER_PID}"
        waitForReconnectcompletedInServerLog
        waitForNumBucketsWithoutRedundancyToBeZeroInGfshShowRegionMetrics
done
{noformat}
The test works fine most of the time, but randomly fails due to an unexpected 
exception logged within the logs of at least one server. The exception is 
always reported from a {{ServerConnection}} thread on the server member that 
has just returned to life, as an example:
{noformat}
[info 2021/08/09 11:01:07.430 GMT system-test-gemfire-server-2 <Pooled Waiting 
Message Processor 11> tid=0x8d] Configured redundancy of 2 copies has been 
restored to /system-test-client-7f6795dfb8-v7hh8-region

[warn 2021/08/09 11:02:19.742 GMT system-test-gemfire-server-2 
<ClientHealthMonitor Thread> tid=0x4d] Server connection from 
[identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1;
 port=50264] is being terminated because its client timeout of 10000 has 
expired.

[warn 2021/08/09 11:02:19.744 GMT system-test-gemfire-server-2 
<ClientHealthMonitor Thread> tid=0x4d] ClientHealthMonitor: Unregistering 
client with member id 
identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1
 due to: Unknown reason

[info 2021/08/09 11:02:19.745 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] received suspect message 
from 
system-test-gemfire-locator-0(system-test-gemfire-locator-0:1:locator)<ec><v1>:41000
 for system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000: 
Member isn't responding to heartbeat requests

[info 2021/08/09 11:02:19.747 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] Membership received a 
request to remove 
system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000 from 
system-test-gemfire-locator-1(system-test-gemfire-locator-1:1:locator)<ec><v0>:41000
 reason=Member isn't responding to heartbeat requests

[warn 2021/08/09 11:02:19.748 GMT system-test-gemfire-server-2 <StatSampler> 
tid=0x38] Statistics sampling thread detected a wakeup delay of 29965 ms, 
indicating a possible resource issue. Check the GC, memory, and CPU statistics.

...

[warn 2021/08/09 11:02:19.854 GMT system-test-gemfire-server-2 
<ServerConnection on port 40404 Thread 9> tid=0x91] ClientHealthMonitor: 
Unregistering client with member id 
identity(system-test-client-7f6795dfb8-v7hh8(SpringBasedClientCacheApplication:1:loner):44012:ec3c8d2a:SpringBasedClientCacheApplication,connection=1
 due to: The connection has been reset while reading the header

[info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] saving cache server 
configuration for use with the cluster-configuration service on reconnect

[info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
receiver,system-test-gemfire-server-2-56622> tid=0x1e] cache server 
configuration saved

[fatal 2021/08/09 11:02:19.876 GMT system-test-gemfire-server-2 
<ServerConnection on port 40404 Thread 14> tid=0xa9] Uncaught exception in 
thread Thread[ServerConnection on port 40404 Thread 14,5,main]
java.lang.NullPointerException
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMessage(ServerConnection.java:865)
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:1022)
        at 
org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1275)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at 
org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.lambda$initializeServerConnectionThreadPool$3(AcceptorImpl.java:690)
        at 
org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:120)
        at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}
The problem itself is really hard to reproduce, we only hit it twice in around 
200 runs. Logs and statistics are attached to the ticket.
 Even though the logs don't show much information, only what I've included 
above, I believe there's a race condition between the {{handleTermination()}} 
and {{doNormalMessage()}} methods within the {{ServerConnection}} class. One of 
the latest tasks within {{handleTermination}} is to set the {{clientUserAuths}} 
attribute as {{null}}, and I haven't found any synchronization around this.
{noformat}
void handleTermination(boolean timedOut) {
...
    if (unregisterClient) {
      // last serverconnection call all close on auth objects
      cleanClientAuths();
    }
    clientUserAuths = null;
    if (needsUnregister) {
      acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
      if (unregisterClient) {
        acceptor.getClientHealthMonitor().unregisterClient(proxyId, 
getAcceptor(),
            clientDisconnectedCleanly, clientDisconnectedException);
      }
    }
...
}
{noformat}
I might be wrong here, but I think that if {{handleTermination}} is invoked 
interleaved with {{doNormalMessage}}, specifically _*after*_ we check for the 
member shutting down but _*before*_ we get the actual subject, the 
{{clientUserAuths}} attribute might have been set as {{null}} already by 
{{handleTermination}} and, as such, the exception is thrown.

—

Below is small test that can be used to easily reproduce the issue:
{noformat}
package org.apache.geode.internal.cache.tier.sockets;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.LENIENT;

import java.net.InetAddress;
import java.net.Socket;
import java.util.Random;

import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.test.junit.categories.ClientServerTest;

@Category(ClientServerTest.class)
public class ServerConnectionNullPointerExceptionIntegrationTest {
  @Rule
  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);

  @Test
  public void 
nullPointerExceptionNotThrownWhenHandleTerminationIsInvokedInterleavedWithDoOneMessage()
 {
    InetAddress inetAddress = mock(InetAddress.class);
    AcceptorImpl acceptor = mock(AcceptorImpl.class);
    Socket socket = mock(Socket.class);
    InternalCache cache = mock(InternalCache.class);
    CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
    SecurityService securityService = mock(SecurityService.class);
    when(securityService.isIntegratedSecurity()).thenReturn(true);
    CacheServerStats stats = mock(CacheServerStats.class);

    when(inetAddress.getHostAddress()).thenReturn("localhost");
    when(socket.getInetAddress()).thenReturn(inetAddress);

    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
    DistributionManager distributionManager = mock(DistributionManager.class);
    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);

    when(cachedRegionHelper.getCache()).thenReturn(cache);
    
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
    
when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);

    
when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));

    ServerConnectionCollection mockCollection = 
spy(ServerConnectionCollection.class);
    when(mockCollection.incrementConnectionsProcessing()).thenReturn(true);

    ClientHealthMonitor mockClientHealthMonitor = 
mock(ClientHealthMonitor.class);
    when(mockClientHealthMonitor.addConnection(any(), 
any())).thenReturn(mockCollection);
    when(acceptor.getClientHealthMonitor()).thenReturn(mockClientHealthMonitor);
    
when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));

    TestServerConnection testServerConnection = new 
TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null, 
CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, 
securityService);

    assertThatCode(testServerConnection::run).doesNotThrowAnyException();
  }

  private static class TestMessage extends Message {
    TestMessage() {
      super(3, KnownVersion.CURRENT);
      messageType = MessageType.REQUEST;
      securePart = new Part();
    }

    @Override
    public void receive() {
    }
  }

  private static class TestServerConnection extends ServerConnection {
    TestServerConnection(Socket socket, InternalCache internalCache,
        CachedRegionHelper cachedRegionHelper, CacheServerStats stats, int 
hsTimeout,
        int socketBufferSize, String communicationModeStr, byte 
communicationMode,
        Acceptor acceptor, SecurityService securityService) {
      super(socket, internalCache, cachedRegionHelper, stats, hsTimeout, 
socketBufferSize,
          communicationModeStr, communicationMode, acceptor, securityService);
      setClientDisconnectCleanly();
    }

    @Override
    protected void doHandshake() {
      ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
      ServerSideHandshake handshake = mock(ServerSideHandshake.class);
      MessageIdExtractor extractor = mock(MessageIdExtractor.class);
      when(handshake.getVersion()).thenReturn(KnownVersion.CURRENT);
      
when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
      setHandshake(handshake);
      setProxyId(proxyID);
      processHandShake();
      setFakeRequest();

      super.doHandshake();
    }

    @Override
    // Naive approach to simulate that handleTermination was invoked in between
    public long getUniqueId() {
      handleTermination();
      return new Random().nextLong();
    }

    private void setFakeRequest() {
      TestMessage testMessage = new TestMessage();
      setRequestMessage(testMessage);
    }
  }
}
{noformat}


> NullPointerException in ServerConnection.doNormalMessage()
> ----------------------------------------------------------
>
>                 Key: GEODE-9538
>                 URL: https://issues.apache.org/jira/browse/GEODE-9538
>             Project: Geode
>          Issue Type: Bug
>          Components: membership
>            Reporter: Juan Ramos
>            Priority: Major
>
> I've hit this issue while executing some chaos testing over a GemFire cluster 
> using 2 locators and 3 servers; {{SSL}} is enabled and a dummy 
> {{SecurityManager}} is configured to authenticate and authorize a 
> pre-configured set of well known users.
>  There are 3 {{PARTITION_REDUNDANT}} regions configured, one per client, each 
> with 1 redundant copy. Once the cluster is up and running, 3 clients 
> continuously execute {{Region.get}} and {{Region.put}} operations on a known 
> set of keys for its own {{Region}} (created with {{PROXY}} type), and another 
> process executes the following logic in parallel (pseudocode):
> {noformat}
> for server in ${servers}
> do
>       # Pause the JVM for 30 seconds to simulate a stop the world GC
>       kill -STOP server 
>       sleep 30
>       # Unpause the JVM, wait for member to reconnect and regions to recover 
> redundancy configured
>       kill -CONT "${SERVER_PID}"
>       waitForReconnectcompletedInServerLog
>       waitForNumBucketsWithoutRedundancyToBeZeroInGfshShowRegionMetrics
> done
> {noformat}
> The test works fine most of the time, but randomly fails due to an unexpected 
> exception logged within the logs of at least one server. The exception is 
> always reported from a {{ServerConnection}} thread on the server member that 
> has just returned to life, as an example:
> {noformat}
> [info 2021/08/09 11:01:07.430 GMT system-test-gemfire-server-2 <Pooled 
> Waiting Message Processor 11> tid=0x8d] Configured redundancy of 2 copies has 
> been restored to /system-test-client-7f6795dfb8-v7hh8-region
> [warn 2021/08/09 11:02:19.742 GMT system-test-gemfire-server-2 
> <ClientHealthMonitor Thread> tid=0x4d] Server connection from 
> [identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1;
>  port=50264] is being terminated because its client timeout of 10000 has 
> expired.
> [warn 2021/08/09 11:02:19.744 GMT system-test-gemfire-server-2 
> <ClientHealthMonitor Thread> tid=0x4d] ClientHealthMonitor: Unregistering 
> client with member id 
> identity(system-test-client-7f6795dfb8-pc8mv(SpringBasedClientCacheApplication:1:loner):34788:814b8d2a:SpringBasedClientCacheApplication,connection=1
>  due to: Unknown reason
> [info 2021/08/09 11:02:19.745 GMT system-test-gemfire-server-2 <unicast 
> receiver,system-test-gemfire-server-2-56622> tid=0x1e] received suspect 
> message from 
> system-test-gemfire-locator-0(system-test-gemfire-locator-0:1:locator)<ec><v1>:41000
>  for system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000: 
> Member isn't responding to heartbeat requests
> [info 2021/08/09 11:02:19.747 GMT system-test-gemfire-server-2 <unicast 
> receiver,system-test-gemfire-server-2-56622> tid=0x1e] Membership received a 
> request to remove 
> system-test-gemfire-server-2(system-test-gemfire-server-2:1)<v3>:41000 from 
> system-test-gemfire-locator-1(system-test-gemfire-locator-1:1:locator)<ec><v0>:41000
>  reason=Member isn't responding to heartbeat requests
> [warn 2021/08/09 11:02:19.748 GMT system-test-gemfire-server-2 <StatSampler> 
> tid=0x38] Statistics sampling thread detected a wakeup delay of 29965 ms, 
> indicating a possible resource issue. Check the GC, memory, and CPU 
> statistics.
> ...
> [warn 2021/08/09 11:02:19.854 GMT system-test-gemfire-server-2 
> <ServerConnection on port 40404 Thread 9> tid=0x91] ClientHealthMonitor: 
> Unregistering client with member id 
> identity(system-test-client-7f6795dfb8-v7hh8(SpringBasedClientCacheApplication:1:loner):44012:ec3c8d2a:SpringBasedClientCacheApplication,connection=1
>  due to: The connection has been reset while reading the header
> [info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
> receiver,system-test-gemfire-server-2-56622> tid=0x1e] saving cache server 
> configuration for use with the cluster-configuration service on reconnect
> [info 2021/08/09 11:02:19.867 GMT system-test-gemfire-server-2 <unicast 
> receiver,system-test-gemfire-server-2-56622> tid=0x1e] cache server 
> configuration saved
> [fatal 2021/08/09 11:02:19.876 GMT system-test-gemfire-server-2 
> <ServerConnection on port 40404 Thread 14> tid=0xa9] Uncaught exception in 
> thread Thread[ServerConnection on port 40404 Thread 14,5,main]
> java.lang.NullPointerException
>       at 
> org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMessage(ServerConnection.java:865)
>       at 
> org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:1022)
>       at 
> org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1275)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at 
> org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.lambda$initializeServerConnectionThreadPool$3(AcceptorImpl.java:690)
>       at 
> org.apache.geode.logging.internal.executors.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:120)
>       at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}
> The problem itself is really hard to reproduce, we only hit it twice in 
> around 200 runs. Logs and statistics can be found 
> [here|https://drive.google.com/file/d/1yHrEQYE2i2Hf6WJY2Dwyj2GqVoCJUoWf/view?usp=sharing]
>  Even though the logs don't show much information, only what I've included 
> above, I believe there's a race condition between the {{handleTermination()}} 
> and {{doNormalMessage()}} methods within the {{ServerConnection}} class. One 
> of the latest tasks within {{handleTermination}} is to set the 
> {{clientUserAuths}} attribute as {{null}}, and I haven't found any 
> synchronization around this.
> {noformat}
> void handleTermination(boolean timedOut) {
> ...
>     if (unregisterClient) {
>       // last serverconnection call all close on auth objects
>       cleanClientAuths();
>     }
>     clientUserAuths = null;
>     if (needsUnregister) {
>       acceptor.getClientHealthMonitor().removeConnection(proxyId, this);
>       if (unregisterClient) {
>         acceptor.getClientHealthMonitor().unregisterClient(proxyId, 
> getAcceptor(),
>             clientDisconnectedCleanly, clientDisconnectedException);
>       }
>     }
> ...
> }
> {noformat}
> I might be wrong here, but I think that if {{handleTermination}} is invoked 
> interleaved with {{doNormalMessage}}, specifically _*after*_ we check for the 
> member shutting down but _*before*_ we get the actual subject, the 
> {{clientUserAuths}} attribute might have been set as {{null}} already by 
> {{handleTermination}} and, as such, the exception is thrown.
> —
> Below is small test that can be used to easily reproduce the issue:
> {noformat}
> package org.apache.geode.internal.cache.tier.sockets;
> import static org.assertj.core.api.Assertions.assertThatCode;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.Mockito.mock;
> import static org.mockito.Mockito.spy;
> import static org.mockito.Mockito.when;
> import static org.mockito.quality.Strictness.LENIENT;
> import java.net.InetAddress;
> import java.net.Socket;
> import java.util.Random;
> import org.junit.Rule;
> import org.junit.Test;
> import org.junit.experimental.categories.Category;
> import org.mockito.junit.MockitoJUnit;
> import org.mockito.junit.MockitoRule;
> import org.apache.geode.distributed.internal.DistributionManager;
> import org.apache.geode.distributed.internal.InternalDistributedSystem;
> import 
> org.apache.geode.distributed.internal.membership.InternalDistributedMember;
> import org.apache.geode.internal.cache.InternalCache;
> import org.apache.geode.internal.cache.TXManagerImpl;
> import org.apache.geode.internal.cache.tier.Acceptor;
> import org.apache.geode.internal.cache.tier.CachedRegionHelper;
> import org.apache.geode.internal.cache.tier.CommunicationMode;
> import org.apache.geode.internal.cache.tier.MessageType;
> import org.apache.geode.internal.cache.tier.ServerSideHandshake;
> import org.apache.geode.internal.monitoring.ThreadsMonitoring;
> import org.apache.geode.internal.security.SecurityService;
> import org.apache.geode.internal.serialization.KnownVersion;
> import org.apache.geode.test.junit.categories.ClientServerTest;
> @Category(ClientServerTest.class)
> public class ServerConnectionNullPointerExceptionIntegrationTest {
>   @Rule
>   public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);
>   @Test
>   public void 
> nullPointerExceptionNotThrownWhenHandleTerminationIsInvokedInterleavedWithDoOneMessage()
>  {
>     InetAddress inetAddress = mock(InetAddress.class);
>     AcceptorImpl acceptor = mock(AcceptorImpl.class);
>     Socket socket = mock(Socket.class);
>     InternalCache cache = mock(InternalCache.class);
>     CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
>     SecurityService securityService = mock(SecurityService.class);
>     when(securityService.isIntegratedSecurity()).thenReturn(true);
>     CacheServerStats stats = mock(CacheServerStats.class);
>     when(inetAddress.getHostAddress()).thenReturn("localhost");
>     when(socket.getInetAddress()).thenReturn(inetAddress);
>     InternalDistributedSystem internalDistributedSystem = 
> mock(InternalDistributedSystem.class);
>     DistributionManager distributionManager = mock(DistributionManager.class);
>     ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
>     when(cachedRegionHelper.getCache()).thenReturn(cache);
>     
> when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
>     when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
>     
> when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
>     
> when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));
>     ServerConnectionCollection mockCollection = 
> spy(ServerConnectionCollection.class);
>     when(mockCollection.incrementConnectionsProcessing()).thenReturn(true);
>     ClientHealthMonitor mockClientHealthMonitor = 
> mock(ClientHealthMonitor.class);
>     when(mockClientHealthMonitor.addConnection(any(), 
> any())).thenReturn(mockCollection);
>     
> when(acceptor.getClientHealthMonitor()).thenReturn(mockClientHealthMonitor);
>     
> when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
>     TestServerConnection testServerConnection = new 
> TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null, 
> CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, 
> securityService);
>     assertThatCode(testServerConnection::run).doesNotThrowAnyException();
>   }
>   private static class TestMessage extends Message {
>     TestMessage() {
>       super(3, KnownVersion.CURRENT);
>       messageType = MessageType.REQUEST;
>       securePart = new Part();
>     }
>     @Override
>     public void receive() {
>     }
>   }
>   private static class TestServerConnection extends ServerConnection {
>     TestServerConnection(Socket socket, InternalCache internalCache,
>         CachedRegionHelper cachedRegionHelper, CacheServerStats stats, int 
> hsTimeout,
>         int socketBufferSize, String communicationModeStr, byte 
> communicationMode,
>         Acceptor acceptor, SecurityService securityService) {
>       super(socket, internalCache, cachedRegionHelper, stats, hsTimeout, 
> socketBufferSize,
>           communicationModeStr, communicationMode, acceptor, securityService);
>       setClientDisconnectCleanly();
>     }
>     @Override
>     protected void doHandshake() {
>       ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
>       ServerSideHandshake handshake = mock(ServerSideHandshake.class);
>       MessageIdExtractor extractor = mock(MessageIdExtractor.class);
>       when(handshake.getVersion()).thenReturn(KnownVersion.CURRENT);
>       
> when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
>       setHandshake(handshake);
>       setProxyId(proxyID);
>       processHandShake();
>       setFakeRequest();
>       super.doHandshake();
>     }
>     @Override
>     // Naive approach to simulate that handleTermination was invoked in 
> between
>     public long getUniqueId() {
>       handleTermination();
>       return new Random().nextLong();
>     }
>     private void setFakeRequest() {
>       TestMessage testMessage = new TestMessage();
>       setRequestMessage(testMessage);
>     }
>   }
> }
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to