This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push: new 0fe6518 GEODE-8394: Rewind the message Part on command failure (#5424) 0fe6518 is described below commit 0fe6518851d7bdd85d544b8f3c0647ab9053c891 Author: agingade <aging...@pivotal.io> AuthorDate: Fri Aug 7 10:12:27 2020 -0700 GEODE-8394: Rewind the message Part on command failure (#5424) GEODE-8394: Rewind the message Part on failure Co-authored-by: anilkumar gingade <anil@anilg.local> (cherry picked from commit 83d1e28a953b7d73e7f499f9013540bedd0bd472) --- .../ClientServerCacheOperationDUnitTest.java | 204 +++++++++++++++++++++ .../geode/internal/cache/tier/sockets/Part.java | 21 ++- .../internal/cache/tier/sockets/PartTest.java | 105 +++++++++++ 3 files changed, 324 insertions(+), 6 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java new file mode 100644 index 0000000..4bfa3cc --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/ClientServerCacheOperationDUnitTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache30; + +import static org.apache.geode.cache.RegionShortcut.REPLICATE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.ServerConnectivityException; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.test.dunit.DistributedTestUtils; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.categories.ClientServerTest; + +@Category({ClientServerTest.class}) +public class ClientServerCacheOperationDUnitTest implements Serializable { + + private String regionName = "CsTestRegion"; + + @Rule + public DistributedRule distributedRule = new DistributedRule(); + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Test + public void largeObjectPutWithReadTimeoutThrowsException() { + VM server1 = VM.getVM(0); + VM server2 = VM.getVM(1); + VM client = VM.getVM(2); + + final int byteSize = 40 * 1000 * 1000; + final int listSize = 2; + final int locatorPort = DistributedTestUtils.getLocatorPort(); + + server1.invoke(() -> createServerCache()); + server2.invoke(() -> createServerCache()); + + server1.invoke(() -> { + RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE); + regionFactory.create(regionName); + }); + + server2.invoke(() -> { + RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE); + regionFactory.create(regionName); + }); + + List<byte[]> list = new ArrayList(listSize); + + for (int i = 0; i < listSize; i++) { + list.add(new byte[byteSize]); + } + + client.invoke(() -> { + clientCacheRule.createClientCache(); + + Pool pool = PoolManager.createFactory() + .addLocator("localhost", locatorPort) + .setSocketBufferSize(50) + .setReadTimeout(40) + .setPingInterval(200) + .setSocketConnectTimeout(50) + .setServerConnectionTimeout(50) + .create("testPool"); + + Region region = clientCacheRule.getClientCache() + .createClientRegionFactory(ClientRegionShortcut.PROXY) + .setPoolName(pool.getName()) + .create(regionName); + + assertThatThrownBy(() -> region.put("key", list)) + .isInstanceOf(ServerConnectivityException.class); + + }); + + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + List value = (List) region.get("key"); + if (value != null) { + assertThat(value.size()).isEqualTo(listSize); + list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize)); + } + }); + + client.invoke(() -> { + Region region = clientCacheRule.getClientCache().getRegion(regionName); + assertThat(region.size()).isEqualTo(0); + List value = (List) region.get("key"); + if (value != null) { + assertThat(value.size()).isEqualTo(listSize); + list.forEach((b) -> assertThat(b.length).isEqualTo(byteSize)); + } + }); + + } + + @Test + public void largeObjectGetWithReadTimeout() { + VM server1 = VM.getVM(0); + VM server2 = VM.getVM(1); + VM server3 = VM.getVM(2); + VM client = VM.getVM(3); + + final int locatorPort = DistributedTestUtils.getLocatorPort(); + + server1.invoke(() -> createServerCache()); + server2.invoke(() -> createServerCache()); + server3.invoke(() -> createServerCache()); + + server1.invoke(() -> { + RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE); + regionFactory.create(regionName); + }); + + server2.invoke(() -> { + RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE); + regionFactory.create(regionName); + }); + + server3.invoke(() -> { + RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE); + Region region = regionFactory.create(regionName); + + int listSize = 2; + List list = new ArrayList(listSize); + + for (int i = 0; i < listSize; i++) { + list.add(new byte[75 * 1000 * 1000]); + } + + region.put("key", list); + }); + + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + + assertThat(region.size()).isEqualTo(1); + }); + + client.invoke(() -> { + clientCacheRule.createClientCache(); + + Pool pool = PoolManager.createFactory() + .addLocator("localhost", locatorPort) + .setSocketBufferSize(100) + .setReadTimeout(50) + .create("testPool"); + + Region region = clientCacheRule.getClientCache() + .createClientRegionFactory(ClientRegionShortcut.PROXY) + .setPoolName(pool.getName()) + .create(regionName); + + region.get("key"); + assertThat(region.size()).isEqualTo(0); + + Object value = region.get("key"); + + assertThat(value).isInstanceOf(List.class); + }); + + } + + private void createServerCache() throws IOException { + cacheRule.createCache(); + CacheServer cacheServer = cacheRule.getCache().addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + } + +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java index f833f05..9654cee 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java @@ -401,8 +401,11 @@ public class Part { } } else { HeapDataOutputStream hdos = (HeapDataOutputStream) this.part; - hdos.sendTo(out, buf); - hdos.rewind(); + try { + hdos.sendTo(out, buf); + } finally { + hdos.rewind(); + } } } } @@ -431,8 +434,11 @@ public class Part { } } else { HeapDataOutputStream hdos = (HeapDataOutputStream) this.part; - hdos.sendTo(buf); - hdos.rewind(); + try { + hdos.sendTo(buf); + } finally { + hdos.rewind(); + } } } } @@ -497,8 +503,11 @@ public class Part { } } else { HeapDataOutputStream hdos = (HeapDataOutputStream) this.part; - hdos.sendTo(sc, buf); - hdos.rewind(); + try { + hdos.sendTo(sc, buf); + } finally { + hdos.rewind(); + } } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java index d744218..2d882b2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/PartTest.java @@ -16,16 +16,24 @@ package org.apache.geode.internal.cache.tier.sockets; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.io.EOFException; import java.io.OutputStream; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.test.junit.categories.ClientServerTest; @Category({ClientServerTest.class}) @@ -76,4 +84,101 @@ public class PartTest { assertThatThrownBy(() -> part.getCachedString()) .hasMessageContaining("expected String part to be of type BYTE, part ="); } + + @Test + public void writeToOutputStreamResetsPartOnException() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + OutputStream outputStream = mock(OutputStream.class); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + doThrow(new EOFException("test")).when(heapDataOutputStream).sendTo(eq(outputStream), + eq(byteBuffer)); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + Throwable thrown = catchThrowable(() -> part.writeTo(outputStream, byteBuffer)); + + assertThat(thrown).isInstanceOf(EOFException.class); + verify(heapDataOutputStream, times(1)).rewind(); + } + + @Test + public void writeToOutputStreamResetsPartOnSuccess() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + OutputStream outputStream = mock(OutputStream.class); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + part.writeTo(outputStream, byteBuffer); + + verify(heapDataOutputStream, times(1)).rewind(); + } + + @Test + public void writeToByteBufferResetsPartOnException() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(byteBuffer)); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + Throwable thrown = catchThrowable(() -> part.writeTo(byteBuffer)); + + assertThat(thrown).isInstanceOf(BufferOverflowException.class); + verify(heapDataOutputStream, times(1)).rewind(); + } + + @Test + public void writeToByteBufferResetsPartOnSuccess() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + part.writeTo(byteBuffer); + + verify(heapDataOutputStream, times(1)).rewind(); + } + + @Test + public void writeToSocketChannelResetsPartOnException() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + SocketChannel socketChannel = mock(SocketChannel.class); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + doThrow(new BufferOverflowException()).when(heapDataOutputStream).sendTo(eq(socketChannel), + eq(byteBuffer)); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + Throwable thrown = catchThrowable(() -> part.writeTo(socketChannel, byteBuffer)); + + assertThat(thrown).isInstanceOf(BufferOverflowException.class); + verify(heapDataOutputStream, times(1)).rewind(); + } + + @Test + public void writeToSocketChannelResetsPartOnSuccess() throws Exception { + HeapDataOutputStream heapDataOutputStream = mock(HeapDataOutputStream.class); + when(heapDataOutputStream.size()).thenReturn(1000); + SocketChannel socketChannel = mock(SocketChannel.class); + ByteBuffer byteBuffer = mock(ByteBuffer.class); + + Part part = new Part(); + part.setPartState(heapDataOutputStream, false); + + part.writeTo(socketChannel, byteBuffer); + + verify(heapDataOutputStream, times(1)).rewind(); + } + }