This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push: new b58bfc5 GEODE-8965: Support Redis-style OOM error message (#6085) (#6213) b58bfc5 is described below commit b58bfc50e70973ef56daff95bed8c5bbfc02d43c Author: Ray Ingles <ring...@pivotal.io> AuthorDate: Thu Apr 1 13:23:57 2021 -0400 GEODE-8965: Support Redis-style OOM error message (#6085) (#6213) - Allow execution of Redis commands in low-memory conditions - Test with large key size to guarantee OOM error - In tests use multiple key sizes to pack used memory more tightly - force multiple GCs as memory is filled, add memory-pressure thread in tests - Add memory pressure to del and expire tests Co-authored-by: Ray Ingles <ring...@vmware.com> (cherry picked from commit 08da3aaa1dd3dd5dd212353ecead32819e584c62) --- .../internal/cache/control/HeapMemoryMonitor.java | 3 +- .../cache/execute/AllowExecutionInLowMemory.java | 23 ++ .../apache/geode/redis/OutOfMemoryDUnitTest.java | 237 +++++++++++++++++++++ .../geode/redis/internal/RedisConstants.java | 2 + .../redis/internal/executor/RedisResponse.java | 4 + .../executor/SingleResultRedisFunction.java | 4 +- .../internal/netty/ExecutionHandlerContext.java | 3 + 7 files changed, 273 insertions(+), 3 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java index 70ded46..72d07bd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java @@ -47,6 +47,7 @@ import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; import org.apache.geode.internal.cache.control.MemoryThresholds.MemoryState; import org.apache.geode.internal.cache.control.ResourceAdvisor.ResourceManagerProfile; +import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory; import org.apache.geode.internal.statistics.GemFireStatSampler; import org.apache.geode.internal.statistics.LocalStatListener; import org.apache.geode.internal.statistics.StatisticsManager; @@ -710,7 +711,7 @@ public class HeapMemoryMonitor implements NotificationListener, MemoryMonitor { public LowMemoryException createLowMemoryIfNeeded(Function function, Set<? extends DistributedMember> memberSet) { - if (function.optimizeForWrite() + if (function.optimizeForWrite() && !(function instanceof AllowExecutionInLowMemory) && !MemoryThresholds.isLowMemoryExceptionDisabled()) { Set<DistributedMember> criticalMembersFrom = getHeapCriticalMembersFrom(memberSet); if (!criticalMembersFrom.isEmpty()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java new file mode 100644 index 0000000..a5dadb6 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AllowExecutionInLowMemory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.execute; + + +/** + * An internal marker interface used to allow functions to run in low-memory conditions. + */ +public interface AllowExecutionInLowMemory extends InternalFunction<Object[]> { + +} diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java new file mode 100644 index 0000000..cd1677e --- /dev/null +++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/OutOfMemoryDUnitTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisException; + +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.rules.RedisClusterStartupRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class OutOfMemoryDUnitTest { + + @ClassRule + public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4); + + @Rule + public ExecutorServiceRule executor = new ExecutorServiceRule(); + + private static final String expectedEx = "Member: .*? above .*? critical threshold"; + private static final String FILLER_KEY = "fillerKey-"; + private static final String PRESSURE_KEY = "pressureKey-"; + private static final String LOCAL_HOST = "127.0.0.1"; + private static final int KEY_TTL_SECONDS = 10; + private static final int MAX_ITERATION_COUNT = 4000; + private static final int LARGE_VALUE_SIZE = 128 * 1024; + private static final int PRESSURE_VALUE_SIZE = 4 * 1024; + private static final int JEDIS_TIMEOUT = + Math.toIntExact(GeodeAwaitility.getTimeout().toMillis()); + private static Jedis jedis1; + private static Jedis jedis2; + + private static MemberVM server1; + private static MemberVM server2; + + private static Thread memoryPressureThread; + + @BeforeClass + public static void classSetup() { + IgnoredException.addIgnoredException(expectedEx); + + MemberVM locator = clusterStartUp.startLocatorVM(0); + + Properties serverProperties = new Properties(); + server1 = clusterStartUp.startRedisVM(1, serverProperties, locator.getPort()); + server2 = clusterStartUp.startRedisVM(2, serverProperties, locator.getPort()); + + server1.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager() + .setCriticalHeapPercentage(5.0F)); + server2.getVM().invoke(() -> RedisClusterStartupRule.getCache().getResourceManager() + .setCriticalHeapPercentage(5.0F)); + + int redisServerPort1 = clusterStartUp.getRedisPort(1); + int redisServerPort2 = clusterStartUp.getRedisPort(2); + + jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT); + jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT); + } + + @Before + public void testSetup() { + jedis1.flushAll(); + } + + @AfterClass + public static void tearDown() { + jedis1.disconnect(); + jedis2.disconnect(); + + server1.stop(); + server2.stop(); + } + + @Test + public void shouldReturnOOMError_forWriteOperations_whenThresholdReached() + throws InterruptedException { + IgnoredException.addIgnoredException(expectedEx); + IgnoredException.addIgnoredException("LowMemoryException"); + + memoryPressureThread = new Thread(makeMemoryPressureRunnable()); + memoryPressureThread.start(); + + fillMemory(jedis2, false); + + assertThatThrownBy(() -> jedis2.set("oneMoreKey", makeLongStringValue(2 * LARGE_VALUE_SIZE))) + .hasMessageContaining("OOM"); + + memoryPressureThread.interrupt(); + memoryPressureThread.join(); + } + + @Test + public void shouldAllowDeleteOperations_afterThresholdReached() throws InterruptedException { + IgnoredException.addIgnoredException(expectedEx); + IgnoredException.addIgnoredException("LowMemoryException"); + + memoryPressureThread = new Thread(makeMemoryPressureRunnable()); + memoryPressureThread.start(); + + fillMemory(jedis2, false); + + assertThatNoException().isThrownBy(() -> jedis2.del(FILLER_KEY + 1)); + + memoryPressureThread.interrupt(); + memoryPressureThread.join(); + } + + @Test + public void shouldAllowExpiration_afterThresholdReached() throws InterruptedException { + IgnoredException.addIgnoredException(expectedEx); + IgnoredException.addIgnoredException("LowMemoryException"); + + memoryPressureThread = new Thread(makeMemoryPressureRunnable()); + memoryPressureThread.start(); + + fillMemory(jedis2, true); + + await().untilAsserted(() -> { + assertThat(jedis2.ttl(FILLER_KEY + 1)).isEqualTo(-2); + }); + + memoryPressureThread.interrupt(); + memoryPressureThread.join(); + } + + // TODO: test that write operations become allowed after memory has dropped + // below critical levels. Difficult to do right now because of vagaries of the + // Java garbage collector. + + private void fillMemory(Jedis jedis, boolean withExpiration) { + String valueString; + int valueSize = LARGE_VALUE_SIZE; + + while (valueSize > 1) { + forceGC(); // Helps ensure we really do fill all available memory + valueString = makeLongStringValue(LARGE_VALUE_SIZE); + addMultipleKeys(jedis, valueString, withExpiration); + valueSize /= 2; + } + } + + private void addMultipleKeys(Jedis jedis, String valueString, boolean withExpiration) { + // count is final because it is never reassigned + AtomicInteger count = new AtomicInteger(); + + Throwable thrown = catchThrowable(() -> { + for (count.set(0); count.get() < MAX_ITERATION_COUNT; count.incrementAndGet()) { + setRedisKeyAndValue(jedis, withExpiration, valueString, count.get()); + } + }); + + assertThat(thrown) + .isInstanceOf(Exception.class) + .hasMessageContaining("OOM command not allowed"); + + assertThat(count.get()).isLessThan(MAX_ITERATION_COUNT); + } + + private void setRedisKeyAndValue(Jedis jedis, boolean withExpiration, String valueString, + int keyNumber) { + if (withExpiration) { + jedis.setex(FILLER_KEY + keyNumber, KEY_TTL_SECONDS, valueString); + } else { + jedis.set(FILLER_KEY + keyNumber, valueString); + } + } + + private static String makeLongStringValue(int requestedSize) { + char[] largeCharData = new char[requestedSize]; + Arrays.fill(largeCharData, 'a'); + return new String(largeCharData); + } + + private static Runnable makeMemoryPressureRunnable() { + return new Runnable() { + boolean running = true; + String pressureValue = makeLongStringValue(PRESSURE_VALUE_SIZE); + + @Override + public void run() { + int i = 0; + while (running) { + if (Thread.currentThread().isInterrupted()) { + running = false; + break; + } + try { + jedis1.set(PRESSURE_KEY + i, pressureValue); + } catch (JedisException je) { + // Ignore, keep trying to fill memory + } + i++; + } + } + }; + } + + private void forceGC() { + server1.getVM().invoke(() -> { + Runtime.getRuntime().gc(); + }); + server2.getVM().invoke(() -> { + Runtime.getRuntime().gc(); + }); + } +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java index 0a86362..313969c 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisConstants.java @@ -49,6 +49,8 @@ public class RedisConstants { public static final String ERROR_SYNTAX = "syntax error"; public static final String ERROR_INVALID_EXPIRE_TIME = "invalid expire time in set"; public static final String ERROR_NOT_A_VALID_FLOAT = "value is not a valid float"; + public static final String ERROR_OOM_COMMAND_NOT_ALLOWED = + "command not allowed when used memory > 'maxmemory'"; public static final String ERROR_UNKNOWN_SLOWLOG_SUBCOMMAND = "Unknown subcommand or wrong number of arguments for '%s'. Try SLOWLOG HELP."; diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java index f6e909e..44c9f2d 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java @@ -121,6 +121,10 @@ public class RedisResponse { return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error)); } + public static RedisResponse oom(String error) { + return new RedisResponse((bba) -> Coder.getOOMResponse(bba, error)); + } + public static RedisResponse customError(String error) { return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error)); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java index ed1ae1a..5cbf5e7 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java @@ -18,12 +18,12 @@ package org.apache.geode.redis.internal.executor; import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.execute.InternalFunction; +import org.apache.geode.internal.cache.execute.AllowExecutionInLowMemory; import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.redis.internal.data.RedisData; import org.apache.geode.redis.internal.data.RedisKey; -public abstract class SingleResultRedisFunction implements InternalFunction<Object[]> { +public abstract class SingleResultRedisFunction implements AllowExecutionInLowMemory { private static final long serialVersionUID = 3239452234149879302L; private final transient PartitionedRegion partitionedRegion; diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java index 4ca7359..3c9056b 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java @@ -40,6 +40,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.ForcedDisconnectException; import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.LowMemoryException; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionInvocationTargetException; import org.apache.geode.distributed.DistributedSystemDisconnectedException; @@ -233,6 +234,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { response = RedisResponse.error(cause.getMessage()); } else if (cause instanceof RedisDataTypeMismatchException) { response = RedisResponse.wrongType(cause.getMessage()); + } else if (cause instanceof LowMemoryException) { + response = RedisResponse.oom(RedisConstants.ERROR_OOM_COMMAND_NOT_ALLOWED); } else if (cause instanceof DecoderException && cause.getCause() instanceof RedisCommandParserException) { response = RedisResponse.error(RedisConstants.PARSING_EXCEPTION_MESSAGE);